Integration test merge: upstream/main into flow-sec security branch (prefer flow-sec on conflicts)

This commit is contained in:
David Osipov
2026-03-19 19:42:04 +04:00
32 changed files with 3371 additions and 206 deletions

View File

@@ -195,6 +195,8 @@ pub(super) struct ZeroPoolData {
pub(super) pool_swap_total: u64,
pub(super) pool_drain_active: u64,
pub(super) pool_force_close_total: u64,
pub(super) pool_drain_soft_evict_total: u64,
pub(super) pool_drain_soft_evict_writer_total: u64,
pub(super) pool_stale_pick_total: u64,
pub(super) writer_removed_total: u64,
pub(super) writer_removed_unexpected_total: u64,
@@ -235,6 +237,7 @@ pub(super) struct MeWritersSummary {
pub(super) available_pct: f64,
pub(super) required_writers: usize,
pub(super) alive_writers: usize,
pub(super) coverage_ratio: f64,
pub(super) coverage_pct: f64,
pub(super) fresh_alive_writers: usize,
pub(super) fresh_coverage_pct: f64,
@@ -283,6 +286,7 @@ pub(super) struct DcStatus {
pub(super) floor_max: usize,
pub(super) floor_capped: bool,
pub(super) alive_writers: usize,
pub(super) coverage_ratio: f64,
pub(super) coverage_pct: f64,
pub(super) fresh_alive_writers: usize,
pub(super) fresh_coverage_pct: f64,
@@ -360,6 +364,11 @@ pub(super) struct MinimalMeRuntimeData {
pub(super) me_reconnect_backoff_cap_ms: u64,
pub(super) me_reconnect_fast_retry_count: u32,
pub(super) me_pool_drain_ttl_secs: u64,
pub(super) me_pool_drain_soft_evict_enabled: bool,
pub(super) me_pool_drain_soft_evict_grace_secs: u64,
pub(super) me_pool_drain_soft_evict_per_writer: u8,
pub(super) me_pool_drain_soft_evict_budget_per_core: u16,
pub(super) me_pool_drain_soft_evict_cooldown_ms: u64,
pub(super) me_pool_force_close_secs: u64,
pub(super) me_pool_min_fresh_ratio: f32,
pub(super) me_bind_stale_mode: &'static str,

View File

@@ -113,6 +113,7 @@ pub(super) struct RuntimeMeQualityDcRttData {
pub(super) rtt_ema_ms: Option<f64>,
pub(super) alive_writers: usize,
pub(super) required_writers: usize,
pub(super) coverage_ratio: f64,
pub(super) coverage_pct: f64,
}
@@ -388,6 +389,7 @@ pub(super) async fn build_runtime_me_quality_data(shared: &ApiShared) -> Runtime
rtt_ema_ms: dc.rtt_ms,
alive_writers: dc.alive_writers,
required_writers: dc.required_writers,
coverage_ratio: dc.coverage_ratio,
coverage_pct: dc.coverage_pct,
})
.collect(),

View File

@@ -96,6 +96,8 @@ pub(super) fn build_zero_all_data(stats: &Stats, configured_users: usize) -> Zer
pool_swap_total: stats.get_pool_swap_total(),
pool_drain_active: stats.get_pool_drain_active(),
pool_force_close_total: stats.get_pool_force_close_total(),
pool_drain_soft_evict_total: stats.get_pool_drain_soft_evict_total(),
pool_drain_soft_evict_writer_total: stats.get_pool_drain_soft_evict_writer_total(),
pool_stale_pick_total: stats.get_pool_stale_pick_total(),
writer_removed_total: stats.get_me_writer_removed_total(),
writer_removed_unexpected_total: stats.get_me_writer_removed_unexpected_total(),
@@ -313,6 +315,7 @@ async fn get_minimal_payload_cached(
available_pct: status.available_pct,
required_writers: status.required_writers,
alive_writers: status.alive_writers,
coverage_ratio: status.coverage_ratio,
coverage_pct: status.coverage_pct,
fresh_alive_writers: status.fresh_alive_writers,
fresh_coverage_pct: status.fresh_coverage_pct,
@@ -370,6 +373,7 @@ async fn get_minimal_payload_cached(
floor_max: entry.floor_max,
floor_capped: entry.floor_capped,
alive_writers: entry.alive_writers,
coverage_ratio: entry.coverage_ratio,
coverage_pct: entry.coverage_pct,
fresh_alive_writers: entry.fresh_alive_writers,
fresh_coverage_pct: entry.fresh_coverage_pct,
@@ -427,6 +431,11 @@ async fn get_minimal_payload_cached(
me_reconnect_backoff_cap_ms: runtime.me_reconnect_backoff_cap_ms,
me_reconnect_fast_retry_count: runtime.me_reconnect_fast_retry_count,
me_pool_drain_ttl_secs: runtime.me_pool_drain_ttl_secs,
me_pool_drain_soft_evict_enabled: runtime.me_pool_drain_soft_evict_enabled,
me_pool_drain_soft_evict_grace_secs: runtime.me_pool_drain_soft_evict_grace_secs,
me_pool_drain_soft_evict_per_writer: runtime.me_pool_drain_soft_evict_per_writer,
me_pool_drain_soft_evict_budget_per_core: runtime.me_pool_drain_soft_evict_budget_per_core,
me_pool_drain_soft_evict_cooldown_ms: runtime.me_pool_drain_soft_evict_cooldown_ms,
me_pool_force_close_secs: runtime.me_pool_force_close_secs,
me_pool_min_fresh_ratio: runtime.me_pool_min_fresh_ratio,
me_bind_stale_mode: runtime.me_bind_stale_mode,
@@ -495,6 +504,7 @@ fn disabled_me_writers(now_epoch_secs: u64, reason: &'static str) -> MeWritersDa
available_pct: 0.0,
required_writers: 0,
alive_writers: 0,
coverage_ratio: 0.0,
coverage_pct: 0.0,
fresh_alive_writers: 0,
fresh_coverage_pct: 0.0,

View File

@@ -27,8 +27,8 @@ const DEFAULT_ME_C2ME_CHANNEL_CAPACITY: usize = 1024;
const DEFAULT_ME_READER_ROUTE_DATA_WAIT_MS: u64 = 2;
const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_FRAMES: usize = 32;
const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_BYTES: usize = 128 * 1024;
const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_DELAY_US: u64 = 1500;
const DEFAULT_ME_D2C_ACK_FLUSH_IMMEDIATE: bool = false;
const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_DELAY_US: u64 = 500;
const DEFAULT_ME_D2C_ACK_FLUSH_IMMEDIATE: bool = true;
const DEFAULT_DIRECT_RELAY_COPY_BUF_C2S_BYTES: usize = 64 * 1024;
const DEFAULT_DIRECT_RELAY_COPY_BUF_S2C_BYTES: usize = 256 * 1024;
const DEFAULT_ME_WRITER_PICK_SAMPLE_SIZE: u8 = 3;
@@ -36,7 +36,16 @@ const DEFAULT_ME_HEALTH_INTERVAL_MS_UNHEALTHY: u64 = 1000;
const DEFAULT_ME_HEALTH_INTERVAL_MS_HEALTHY: u64 = 3000;
const DEFAULT_ME_ADMISSION_POLL_MS: u64 = 1000;
const DEFAULT_ME_WARN_RATE_LIMIT_MS: u64 = 5000;
const DEFAULT_ME_ROUTE_HYBRID_MAX_WAIT_MS: u64 = 3000;
const DEFAULT_ME_ROUTE_BLOCKING_SEND_TIMEOUT_MS: u64 = 250;
const DEFAULT_ME_C2ME_SEND_TIMEOUT_MS: u64 = 4000;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_ENABLED: bool = true;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_GRACE_SECS: u64 = 30;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_PER_WRITER: u8 = 1;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_BUDGET_PER_CORE: u16 = 8;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_COOLDOWN_MS: u64 = 5000;
const DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS: u64 = 30;
const DEFAULT_ACCEPT_PERMIT_TIMEOUT_MS: u64 = 250;
const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 2;
const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 5;
const DEFAULT_UPSTREAM_CONNECT_BUDGET_MS: u64 = 3000;
@@ -87,11 +96,11 @@ pub(crate) fn default_connect_timeout() -> u64 {
}
pub(crate) fn default_keepalive() -> u64 {
60
15
}
pub(crate) fn default_ack_timeout() -> u64 {
300
90
}
pub(crate) fn default_me_one_retry() -> u8 {
12
@@ -153,6 +162,10 @@ pub(crate) fn default_server_max_connections() -> u32 {
10_000
}
pub(crate) fn default_accept_permit_timeout_ms() -> u64 {
DEFAULT_ACCEPT_PERMIT_TIMEOUT_MS
}
pub(crate) fn default_prefer_4() -> u8 {
4
}
@@ -377,6 +390,18 @@ pub(crate) fn default_me_warn_rate_limit_ms() -> u64 {
DEFAULT_ME_WARN_RATE_LIMIT_MS
}
pub(crate) fn default_me_route_hybrid_max_wait_ms() -> u64 {
DEFAULT_ME_ROUTE_HYBRID_MAX_WAIT_MS
}
pub(crate) fn default_me_route_blocking_send_timeout_ms() -> u64 {
DEFAULT_ME_ROUTE_BLOCKING_SEND_TIMEOUT_MS
}
pub(crate) fn default_me_c2me_send_timeout_ms() -> u64 {
DEFAULT_ME_C2ME_SEND_TIMEOUT_MS
}
pub(crate) fn default_upstream_connect_retry_attempts() -> u32 {
DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS
}
@@ -594,6 +619,26 @@ pub(crate) fn default_me_pool_drain_threshold() -> u64 {
128
}
pub(crate) fn default_me_pool_drain_soft_evict_enabled() -> bool {
DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_ENABLED
}
pub(crate) fn default_me_pool_drain_soft_evict_grace_secs() -> u64 {
DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_GRACE_SECS
}
pub(crate) fn default_me_pool_drain_soft_evict_per_writer() -> u8 {
DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_PER_WRITER
}
pub(crate) fn default_me_pool_drain_soft_evict_budget_per_core() -> u16 {
DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_BUDGET_PER_CORE
}
pub(crate) fn default_me_pool_drain_soft_evict_cooldown_ms() -> u64 {
DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_COOLDOWN_MS
}
pub(crate) fn default_me_bind_stale_ttl_secs() -> u64 {
default_me_pool_drain_ttl_secs()
}

View File

@@ -346,6 +346,12 @@ impl ProxyConfig {
));
}
if config.general.me_c2me_send_timeout_ms > 60_000 {
return Err(ProxyError::Config(
"general.me_c2me_send_timeout_ms must be within [0, 60000]".to_string(),
));
}
if config.general.me_reader_route_data_wait_ms > 20 {
return Err(ProxyError::Config(
"general.me_reader_route_data_wait_ms must be within [0, 20]".to_string(),
@@ -406,6 +412,35 @@ impl ProxyConfig {
));
}
if config.general.me_pool_drain_soft_evict_grace_secs > 3600 {
return Err(ProxyError::Config(
"general.me_pool_drain_soft_evict_grace_secs must be within [0, 3600]".to_string(),
));
}
if config.general.me_pool_drain_soft_evict_per_writer == 0
|| config.general.me_pool_drain_soft_evict_per_writer > 16
{
return Err(ProxyError::Config(
"general.me_pool_drain_soft_evict_per_writer must be within [1, 16]".to_string(),
));
}
if config.general.me_pool_drain_soft_evict_budget_per_core == 0
|| config.general.me_pool_drain_soft_evict_budget_per_core > 64
{
return Err(ProxyError::Config(
"general.me_pool_drain_soft_evict_budget_per_core must be within [1, 64]"
.to_string(),
));
}
if config.general.me_pool_drain_soft_evict_cooldown_ms == 0 {
return Err(ProxyError::Config(
"general.me_pool_drain_soft_evict_cooldown_ms must be > 0".to_string(),
));
}
if config.access.user_max_unique_ips_window_secs == 0 {
return Err(ProxyError::Config(
"access.user_max_unique_ips_window_secs must be > 0".to_string(),
@@ -577,6 +612,11 @@ impl ProxyConfig {
"general.me_route_backpressure_base_timeout_ms must be > 0".to_string(),
));
}
if config.general.me_route_backpressure_base_timeout_ms > 5000 {
return Err(ProxyError::Config(
"general.me_route_backpressure_base_timeout_ms must be within [1, 5000]".to_string(),
));
}
if config.general.me_route_backpressure_high_timeout_ms
< config.general.me_route_backpressure_base_timeout_ms
@@ -585,6 +625,11 @@ impl ProxyConfig {
"general.me_route_backpressure_high_timeout_ms must be >= general.me_route_backpressure_base_timeout_ms".to_string(),
));
}
if config.general.me_route_backpressure_high_timeout_ms > 5000 {
return Err(ProxyError::Config(
"general.me_route_backpressure_high_timeout_ms must be within [1, 5000]".to_string(),
));
}
if !(1..=100).contains(&config.general.me_route_backpressure_high_watermark_pct) {
return Err(ProxyError::Config(
@@ -598,6 +643,18 @@ impl ProxyConfig {
));
}
if !(50..=60_000).contains(&config.general.me_route_hybrid_max_wait_ms) {
return Err(ProxyError::Config(
"general.me_route_hybrid_max_wait_ms must be within [50, 60000]".to_string(),
));
}
if config.general.me_route_blocking_send_timeout_ms > 5000 {
return Err(ProxyError::Config(
"general.me_route_blocking_send_timeout_ms must be within [0, 5000]".to_string(),
));
}
if !(2..=4).contains(&config.general.me_writer_pick_sample_size) {
return Err(ProxyError::Config(
"general.me_writer_pick_sample_size must be within [2, 4]".to_string(),
@@ -658,6 +715,12 @@ impl ProxyConfig {
));
}
if config.server.accept_permit_timeout_ms > 60_000 {
return Err(ProxyError::Config(
"server.accept_permit_timeout_ms must be within [0, 60000]".to_string(),
));
}
if config.general.effective_me_pool_force_close_secs() > 0
&& config.general.effective_me_pool_force_close_secs()
< config.general.me_pool_drain_ttl_secs
@@ -1571,6 +1634,47 @@ mod tests {
let _ = std::fs::remove_file(path_valid);
}
#[test]
fn me_route_backpressure_base_timeout_ms_out_of_range_is_rejected() {
let toml = r#"
[general]
me_route_backpressure_base_timeout_ms = 5001
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_me_route_backpressure_base_timeout_ms_out_of_range_test.toml");
std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(err.contains("general.me_route_backpressure_base_timeout_ms must be within [1, 5000]"));
let _ = std::fs::remove_file(path);
}
#[test]
fn me_route_backpressure_high_timeout_ms_out_of_range_is_rejected() {
let toml = r#"
[general]
me_route_backpressure_base_timeout_ms = 100
me_route_backpressure_high_timeout_ms = 5001
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_me_route_backpressure_high_timeout_ms_out_of_range_test.toml");
std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(err.contains("general.me_route_backpressure_high_timeout_ms must be within [1, 5000]"));
let _ = std::fs::remove_file(path);
}
#[test]
fn me_route_no_writer_wait_ms_out_of_range_is_rejected() {
let toml = r#"

View File

@@ -462,6 +462,11 @@ pub struct GeneralConfig {
#[serde(default = "default_me_c2me_channel_capacity")]
pub me_c2me_channel_capacity: usize,
/// Maximum wait in milliseconds for enqueueing C2ME commands when the queue is full.
/// `0` keeps legacy unbounded wait behavior.
#[serde(default = "default_me_c2me_send_timeout_ms")]
pub me_c2me_send_timeout_ms: u64,
/// Bounded wait in milliseconds for routing ME DATA to per-connection queue.
/// `0` keeps legacy no-wait behavior.
#[serde(default = "default_me_reader_route_data_wait_ms")]
@@ -716,6 +721,15 @@ pub struct GeneralConfig {
#[serde(default = "default_me_route_no_writer_wait_ms")]
pub me_route_no_writer_wait_ms: u64,
/// Maximum cumulative wait in milliseconds for hybrid no-writer mode before failfast.
#[serde(default = "default_me_route_hybrid_max_wait_ms")]
pub me_route_hybrid_max_wait_ms: u64,
/// Maximum wait in milliseconds for blocking ME writer channel send fallback.
/// `0` keeps legacy unbounded wait behavior.
#[serde(default = "default_me_route_blocking_send_timeout_ms")]
pub me_route_blocking_send_timeout_ms: u64,
/// Number of inline recovery attempts in legacy mode.
#[serde(default = "default_me_route_inline_recovery_attempts")]
pub me_route_inline_recovery_attempts: u32,
@@ -803,6 +817,26 @@ pub struct GeneralConfig {
#[serde(default = "default_me_pool_drain_threshold")]
pub me_pool_drain_threshold: u64,
/// Enable staged client eviction for draining ME writers that remain non-empty past TTL.
#[serde(default = "default_me_pool_drain_soft_evict_enabled")]
pub me_pool_drain_soft_evict_enabled: bool,
/// Extra grace in seconds after drain TTL before soft-eviction stage starts.
#[serde(default = "default_me_pool_drain_soft_evict_grace_secs")]
pub me_pool_drain_soft_evict_grace_secs: u64,
/// Maximum number of client sessions to evict from one draining writer per health tick.
#[serde(default = "default_me_pool_drain_soft_evict_per_writer")]
pub me_pool_drain_soft_evict_per_writer: u8,
/// Soft-eviction budget per CPU core for one health tick.
#[serde(default = "default_me_pool_drain_soft_evict_budget_per_core")]
pub me_pool_drain_soft_evict_budget_per_core: u16,
/// Cooldown for repetitive soft-eviction on the same writer in milliseconds.
#[serde(default = "default_me_pool_drain_soft_evict_cooldown_ms")]
pub me_pool_drain_soft_evict_cooldown_ms: u64,
/// Policy for new binds on stale draining writers.
#[serde(default)]
pub me_bind_stale_mode: MeBindStaleMode,
@@ -901,6 +935,7 @@ impl Default for GeneralConfig {
me_writer_cmd_channel_capacity: default_me_writer_cmd_channel_capacity(),
me_route_channel_capacity: default_me_route_channel_capacity(),
me_c2me_channel_capacity: default_me_c2me_channel_capacity(),
me_c2me_send_timeout_ms: default_me_c2me_send_timeout_ms(),
me_reader_route_data_wait_ms: default_me_reader_route_data_wait_ms(),
me_d2c_flush_batch_max_frames: default_me_d2c_flush_batch_max_frames(),
me_d2c_flush_batch_max_bytes: default_me_d2c_flush_batch_max_bytes(),
@@ -955,6 +990,8 @@ impl Default for GeneralConfig {
me_warn_rate_limit_ms: default_me_warn_rate_limit_ms(),
me_route_no_writer_mode: MeRouteNoWriterMode::default(),
me_route_no_writer_wait_ms: default_me_route_no_writer_wait_ms(),
me_route_hybrid_max_wait_ms: default_me_route_hybrid_max_wait_ms(),
me_route_blocking_send_timeout_ms: default_me_route_blocking_send_timeout_ms(),
me_route_inline_recovery_attempts: default_me_route_inline_recovery_attempts(),
me_route_inline_recovery_wait_ms: default_me_route_inline_recovery_wait_ms(),
links: LinksConfig::default(),
@@ -984,6 +1021,13 @@ impl Default for GeneralConfig {
proxy_secret_len_max: default_proxy_secret_len_max(),
me_pool_drain_ttl_secs: default_me_pool_drain_ttl_secs(),
me_pool_drain_threshold: default_me_pool_drain_threshold(),
me_pool_drain_soft_evict_enabled: default_me_pool_drain_soft_evict_enabled(),
me_pool_drain_soft_evict_grace_secs: default_me_pool_drain_soft_evict_grace_secs(),
me_pool_drain_soft_evict_per_writer: default_me_pool_drain_soft_evict_per_writer(),
me_pool_drain_soft_evict_budget_per_core:
default_me_pool_drain_soft_evict_budget_per_core(),
me_pool_drain_soft_evict_cooldown_ms:
default_me_pool_drain_soft_evict_cooldown_ms(),
me_bind_stale_mode: MeBindStaleMode::default(),
me_bind_stale_ttl_secs: default_me_bind_stale_ttl_secs(),
me_pool_min_fresh_ratio: default_me_pool_min_fresh_ratio(),
@@ -1187,6 +1231,11 @@ pub struct ServerConfig {
/// 0 means unlimited.
#[serde(default = "default_server_max_connections")]
pub max_connections: u32,
/// Maximum wait in milliseconds while acquiring a connection slot permit.
/// `0` keeps legacy unbounded wait behavior.
#[serde(default = "default_accept_permit_timeout_ms")]
pub accept_permit_timeout_ms: u64,
}
impl Default for ServerConfig {
@@ -1207,6 +1256,7 @@ impl Default for ServerConfig {
api: ApiConfig::default(),
listeners: Vec::new(),
max_connections: default_server_max_connections(),
accept_permit_timeout_ms: default_accept_permit_timeout_ms(),
}
}
}

View File

@@ -253,6 +253,7 @@ pub(crate) fn format_uptime(total_secs: u64) -> String {
format!("{} / {} seconds", parts.join(", "), total_secs)
}
#[allow(dead_code)]
pub(crate) async fn wait_until_admission_open(admission_rx: &mut watch::Receiver<bool>) -> bool {
loop {
if *admission_rx.borrow() {

View File

@@ -24,7 +24,7 @@ use crate::transport::{
ListenOptions, UpstreamManager, create_listener, find_listener_processes,
};
use super::helpers::{is_expected_handshake_eof, print_proxy_links, wait_until_admission_open};
use super::helpers::{is_expected_handshake_eof, print_proxy_links};
pub(crate) struct BoundListeners {
pub(crate) listeners: Vec<(TcpListener, bool)>,
@@ -195,7 +195,7 @@ pub(crate) async fn bind_listeners(
has_unix_listener = true;
let mut config_rx_unix: watch::Receiver<Arc<ProxyConfig>> = config_rx.clone();
let mut admission_rx_unix = admission_rx.clone();
let admission_rx_unix = admission_rx.clone();
let stats = stats.clone();
let upstream_manager = upstream_manager.clone();
let replay_checker = replay_checker.clone();
@@ -212,17 +212,44 @@ pub(crate) async fn bind_listeners(
let unix_conn_counter = Arc::new(std::sync::atomic::AtomicU64::new(1));
loop {
if !wait_until_admission_open(&mut admission_rx_unix).await {
warn!("Conditional-admission gate channel closed for unix listener");
break;
}
match unix_listener.accept().await {
Ok((stream, _)) => {
let permit = match max_connections_unix.clone().acquire_owned().await {
Ok(permit) => permit,
Err(_) => {
error!("Connection limiter is closed");
break;
if !*admission_rx_unix.borrow() {
drop(stream);
continue;
}
let accept_permit_timeout_ms = config_rx_unix
.borrow()
.server
.accept_permit_timeout_ms;
let permit = if accept_permit_timeout_ms == 0 {
match max_connections_unix.clone().acquire_owned().await {
Ok(permit) => permit,
Err(_) => {
error!("Connection limiter is closed");
break;
}
}
} else {
match tokio::time::timeout(
Duration::from_millis(accept_permit_timeout_ms),
max_connections_unix.clone().acquire_owned(),
)
.await
{
Ok(Ok(permit)) => permit,
Ok(Err(_)) => {
error!("Connection limiter is closed");
break;
}
Err(_) => {
debug!(
timeout_ms = accept_permit_timeout_ms,
"Dropping accepted unix connection: permit wait timeout"
);
drop(stream);
continue;
}
}
};
let conn_id =
@@ -312,7 +339,7 @@ pub(crate) fn spawn_tcp_accept_loops(
) {
for (listener, listener_proxy_protocol) in listeners {
let mut config_rx: watch::Receiver<Arc<ProxyConfig>> = config_rx.clone();
let mut admission_rx_tcp = admission_rx.clone();
let admission_rx_tcp = admission_rx.clone();
let stats = stats.clone();
let upstream_manager = upstream_manager.clone();
let replay_checker = replay_checker.clone();
@@ -327,17 +354,46 @@ pub(crate) fn spawn_tcp_accept_loops(
tokio::spawn(async move {
loop {
if !wait_until_admission_open(&mut admission_rx_tcp).await {
warn!("Conditional-admission gate channel closed for tcp listener");
break;
}
match listener.accept().await {
Ok((stream, peer_addr)) => {
let permit = match max_connections_tcp.clone().acquire_owned().await {
Ok(permit) => permit,
Err(_) => {
error!("Connection limiter is closed");
break;
if !*admission_rx_tcp.borrow() {
debug!(peer = %peer_addr, "Admission gate closed, dropping connection");
drop(stream);
continue;
}
let accept_permit_timeout_ms = config_rx
.borrow()
.server
.accept_permit_timeout_ms;
let permit = if accept_permit_timeout_ms == 0 {
match max_connections_tcp.clone().acquire_owned().await {
Ok(permit) => permit,
Err(_) => {
error!("Connection limiter is closed");
break;
}
}
} else {
match tokio::time::timeout(
Duration::from_millis(accept_permit_timeout_ms),
max_connections_tcp.clone().acquire_owned(),
)
.await
{
Ok(Ok(permit)) => permit,
Ok(Err(_)) => {
error!("Connection limiter is closed");
break;
}
Err(_) => {
debug!(
peer = %peer_addr,
timeout_ms = accept_permit_timeout_ms,
"Dropping accepted connection: permit wait timeout"
);
drop(stream);
continue;
}
}
};
let config = config_rx.borrow_and_update().clone();

View File

@@ -238,6 +238,11 @@ pub(crate) async fn initialize_me_pool(
config.general.hardswap,
config.general.me_pool_drain_ttl_secs,
config.general.me_pool_drain_threshold,
config.general.me_pool_drain_soft_evict_enabled,
config.general.me_pool_drain_soft_evict_grace_secs,
config.general.me_pool_drain_soft_evict_per_writer,
config.general.me_pool_drain_soft_evict_budget_per_core,
config.general.me_pool_drain_soft_evict_cooldown_ms,
config.general.effective_me_pool_force_close_secs(),
config.general.me_pool_min_fresh_ratio,
config.general.me_hardswap_warmup_delay_min_ms,
@@ -262,6 +267,8 @@ pub(crate) async fn initialize_me_pool(
config.general.me_warn_rate_limit_ms,
config.general.me_route_no_writer_mode,
config.general.me_route_no_writer_wait_ms,
config.general.me_route_hybrid_max_wait_ms,
config.general.me_route_blocking_send_timeout_ms,
config.general.me_route_inline_recovery_attempts,
config.general.me_route_inline_recovery_wait_ms,
);

View File

@@ -484,7 +484,7 @@ pub async fn run() -> std::result::Result<(), Box<dyn std::error::Error>> {
Duration::from_secs(config.access.replay_window_secs),
));
let buffer_pool = Arc::new(BufferPool::with_config(16 * 1024, 4096));
let buffer_pool = Arc::new(BufferPool::with_config(64 * 1024, 4096));
connectivity::run_startup_connectivity(
&config,

View File

@@ -292,6 +292,109 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
"telemt_connections_bad_total {}",
if core_enabled { stats.get_connects_bad() } else { 0 }
);
let _ = writeln!(out, "# HELP telemt_connections_current Current active connections");
let _ = writeln!(out, "# TYPE telemt_connections_current gauge");
let _ = writeln!(
out,
"telemt_connections_current {}",
if core_enabled {
stats.get_current_connections_total()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_connections_direct_current Current active direct connections");
let _ = writeln!(out, "# TYPE telemt_connections_direct_current gauge");
let _ = writeln!(
out,
"telemt_connections_direct_current {}",
if core_enabled {
stats.get_current_connections_direct()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_connections_me_current Current active middle-end connections");
let _ = writeln!(out, "# TYPE telemt_connections_me_current gauge");
let _ = writeln!(
out,
"telemt_connections_me_current {}",
if core_enabled {
stats.get_current_connections_me()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_relay_adaptive_promotions_total Adaptive relay tier promotions"
);
let _ = writeln!(out, "# TYPE telemt_relay_adaptive_promotions_total counter");
let _ = writeln!(
out,
"telemt_relay_adaptive_promotions_total {}",
if core_enabled {
stats.get_relay_adaptive_promotions_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_relay_adaptive_demotions_total Adaptive relay tier demotions"
);
let _ = writeln!(out, "# TYPE telemt_relay_adaptive_demotions_total counter");
let _ = writeln!(
out,
"telemt_relay_adaptive_demotions_total {}",
if core_enabled {
stats.get_relay_adaptive_demotions_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_relay_adaptive_hard_promotions_total Adaptive relay hard promotions triggered by write pressure"
);
let _ = writeln!(
out,
"# TYPE telemt_relay_adaptive_hard_promotions_total counter"
);
let _ = writeln!(
out,
"telemt_relay_adaptive_hard_promotions_total {}",
if core_enabled {
stats.get_relay_adaptive_hard_promotions_total()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_reconnect_evict_total Reconnect-driven session evictions");
let _ = writeln!(out, "# TYPE telemt_reconnect_evict_total counter");
let _ = writeln!(
out,
"telemt_reconnect_evict_total {}",
if core_enabled {
stats.get_reconnect_evict_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_reconnect_stale_close_total Sessions closed because they became stale after reconnect"
);
let _ = writeln!(out, "# TYPE telemt_reconnect_stale_close_total counter");
let _ = writeln!(
out,
"telemt_reconnect_stale_close_total {}",
if core_enabled {
stats.get_reconnect_stale_close_total()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_handshake_timeouts_total Handshake timeouts");
let _ = writeln!(out, "# TYPE telemt_handshake_timeouts_total counter");
@@ -1547,6 +1650,36 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
}
);
let _ = writeln!(
out,
"# HELP telemt_pool_drain_soft_evict_total Soft-evicted client sessions on stuck draining writers"
);
let _ = writeln!(out, "# TYPE telemt_pool_drain_soft_evict_total counter");
let _ = writeln!(
out,
"telemt_pool_drain_soft_evict_total {}",
if me_allows_normal {
stats.get_pool_drain_soft_evict_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_pool_drain_soft_evict_writer_total Draining writers with at least one soft eviction"
);
let _ = writeln!(out, "# TYPE telemt_pool_drain_soft_evict_writer_total counter");
let _ = writeln!(
out,
"telemt_pool_drain_soft_evict_writer_total {}",
if me_allows_normal {
stats.get_pool_drain_soft_evict_writer_total()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_pool_stale_pick_total Stale writer fallback picks for new binds");
let _ = writeln!(out, "# TYPE telemt_pool_stale_pick_total counter");
let _ = writeln!(
@@ -1559,6 +1692,57 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
}
);
let _ = writeln!(
out,
"# HELP telemt_me_writer_close_signal_drop_total Close-signal drops for already-removed ME writers"
);
let _ = writeln!(out, "# TYPE telemt_me_writer_close_signal_drop_total counter");
let _ = writeln!(
out,
"telemt_me_writer_close_signal_drop_total {}",
if me_allows_normal {
stats.get_me_writer_close_signal_drop_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_writer_close_signal_channel_full_total Close-signal drops caused by full writer command channels"
);
let _ = writeln!(
out,
"# TYPE telemt_me_writer_close_signal_channel_full_total counter"
);
let _ = writeln!(
out,
"telemt_me_writer_close_signal_channel_full_total {}",
if me_allows_normal {
stats.get_me_writer_close_signal_channel_full_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_draining_writers_reap_progress_total Draining-writer removals processed by reap cleanup"
);
let _ = writeln!(
out,
"# TYPE telemt_me_draining_writers_reap_progress_total counter"
);
let _ = writeln!(
out,
"telemt_me_draining_writers_reap_progress_total {}",
if me_allows_normal {
stats.get_me_draining_writers_reap_progress_total()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_me_writer_removed_total Total ME writer removals");
let _ = writeln!(out, "# TYPE telemt_me_writer_removed_total counter");
let _ = writeln!(
@@ -1864,6 +2048,8 @@ mod tests {
stats.increment_connects_all();
stats.increment_connects_all();
stats.increment_connects_bad();
stats.increment_current_connections_direct();
stats.increment_current_connections_me();
stats.increment_handshake_timeouts();
stats.increment_upstream_connect_attempt_total();
stats.increment_upstream_connect_attempt_total();
@@ -1895,6 +2081,9 @@ mod tests {
assert!(output.contains("telemt_connections_total 2"));
assert!(output.contains("telemt_connections_bad_total 1"));
assert!(output.contains("telemt_connections_current 2"));
assert!(output.contains("telemt_connections_direct_current 1"));
assert!(output.contains("telemt_connections_me_current 1"));
assert!(output.contains("telemt_handshake_timeouts_total 1"));
assert!(output.contains("telemt_upstream_connect_attempt_total 2"));
assert!(output.contains("telemt_upstream_connect_success_total 1"));
@@ -1937,6 +2126,9 @@ mod tests {
let output = render_metrics(&stats, &config, &tracker).await;
assert!(output.contains("telemt_connections_total 0"));
assert!(output.contains("telemt_connections_bad_total 0"));
assert!(output.contains("telemt_connections_current 0"));
assert!(output.contains("telemt_connections_direct_current 0"));
assert!(output.contains("telemt_connections_me_current 0"));
assert!(output.contains("telemt_handshake_timeouts_total 0"));
assert!(output.contains("telemt_user_unique_ips_current{user="));
assert!(output.contains("telemt_user_unique_ips_recent_window{user="));
@@ -1970,11 +2162,28 @@ mod tests {
assert!(output.contains("# TYPE telemt_uptime_seconds gauge"));
assert!(output.contains("# TYPE telemt_connections_total counter"));
assert!(output.contains("# TYPE telemt_connections_bad_total counter"));
assert!(output.contains("# TYPE telemt_connections_current gauge"));
assert!(output.contains("# TYPE telemt_connections_direct_current gauge"));
assert!(output.contains("# TYPE telemt_connections_me_current gauge"));
assert!(output.contains("# TYPE telemt_relay_adaptive_promotions_total counter"));
assert!(output.contains("# TYPE telemt_relay_adaptive_demotions_total counter"));
assert!(output.contains("# TYPE telemt_relay_adaptive_hard_promotions_total counter"));
assert!(output.contains("# TYPE telemt_reconnect_evict_total counter"));
assert!(output.contains("# TYPE telemt_reconnect_stale_close_total counter"));
assert!(output.contains("# TYPE telemt_handshake_timeouts_total counter"));
assert!(output.contains("# TYPE telemt_upstream_connect_attempt_total counter"));
assert!(output.contains("# TYPE telemt_me_rpc_proxy_req_signal_sent_total counter"));
assert!(output.contains("# TYPE telemt_me_idle_close_by_peer_total counter"));
assert!(output.contains("# TYPE telemt_me_writer_removed_total counter"));
assert!(output.contains("# TYPE telemt_me_writer_close_signal_drop_total counter"));
assert!(output.contains(
"# TYPE telemt_me_writer_close_signal_channel_full_total counter"
));
assert!(output.contains(
"# TYPE telemt_me_draining_writers_reap_progress_total counter"
));
assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_total counter"));
assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_writer_total counter"));
assert!(output.contains(
"# TYPE telemt_me_writer_removed_unexpected_minus_restored_total gauge"
));

View File

@@ -0,0 +1,383 @@
use dashmap::DashMap;
use std::cmp::max;
use std::sync::OnceLock;
use std::time::{Duration, Instant};
const EMA_ALPHA: f64 = 0.2;
const PROFILE_TTL: Duration = Duration::from_secs(300);
const THROUGHPUT_UP_BPS: f64 = 8_000_000.0;
const THROUGHPUT_DOWN_BPS: f64 = 2_000_000.0;
const RATIO_CONFIRM_THRESHOLD: f64 = 1.12;
const TIER1_HOLD_TICKS: u32 = 8;
const TIER2_HOLD_TICKS: u32 = 4;
const QUIET_DEMOTE_TICKS: u32 = 480;
const HARD_COOLDOWN_TICKS: u32 = 20;
const HARD_PENDING_THRESHOLD: u32 = 3;
const HARD_PARTIAL_RATIO_THRESHOLD: f64 = 0.25;
const DIRECT_C2S_CAP_BYTES: usize = 128 * 1024;
const DIRECT_S2C_CAP_BYTES: usize = 512 * 1024;
const ME_FRAMES_CAP: usize = 96;
const ME_BYTES_CAP: usize = 384 * 1024;
const ME_DELAY_MIN_US: u64 = 150;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum AdaptiveTier {
Base = 0,
Tier1 = 1,
Tier2 = 2,
Tier3 = 3,
}
impl AdaptiveTier {
pub fn promote(self) -> Self {
match self {
Self::Base => Self::Tier1,
Self::Tier1 => Self::Tier2,
Self::Tier2 => Self::Tier3,
Self::Tier3 => Self::Tier3,
}
}
pub fn demote(self) -> Self {
match self {
Self::Base => Self::Base,
Self::Tier1 => Self::Base,
Self::Tier2 => Self::Tier1,
Self::Tier3 => Self::Tier2,
}
}
fn ratio(self) -> (usize, usize) {
match self {
Self::Base => (1, 1),
Self::Tier1 => (5, 4),
Self::Tier2 => (3, 2),
Self::Tier3 => (2, 1),
}
}
pub fn as_u8(self) -> u8 {
self as u8
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TierTransitionReason {
SoftConfirmed,
HardPressure,
QuietDemotion,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct TierTransition {
pub from: AdaptiveTier,
pub to: AdaptiveTier,
pub reason: TierTransitionReason,
}
#[derive(Debug, Clone, Copy, Default)]
pub struct RelaySignalSample {
pub c2s_bytes: u64,
pub s2c_requested_bytes: u64,
pub s2c_written_bytes: u64,
pub s2c_write_ops: u64,
pub s2c_partial_writes: u64,
pub s2c_consecutive_pending_writes: u32,
}
#[derive(Debug, Clone, Copy)]
pub struct SessionAdaptiveController {
tier: AdaptiveTier,
max_tier_seen: AdaptiveTier,
throughput_ema_bps: f64,
incoming_ema_bps: f64,
outgoing_ema_bps: f64,
tier1_hold_ticks: u32,
tier2_hold_ticks: u32,
quiet_ticks: u32,
hard_cooldown_ticks: u32,
}
impl SessionAdaptiveController {
pub fn new(initial_tier: AdaptiveTier) -> Self {
Self {
tier: initial_tier,
max_tier_seen: initial_tier,
throughput_ema_bps: 0.0,
incoming_ema_bps: 0.0,
outgoing_ema_bps: 0.0,
tier1_hold_ticks: 0,
tier2_hold_ticks: 0,
quiet_ticks: 0,
hard_cooldown_ticks: 0,
}
}
pub fn max_tier_seen(&self) -> AdaptiveTier {
self.max_tier_seen
}
pub fn observe(&mut self, sample: RelaySignalSample, tick_secs: f64) -> Option<TierTransition> {
if tick_secs <= f64::EPSILON {
return None;
}
if self.hard_cooldown_ticks > 0 {
self.hard_cooldown_ticks -= 1;
}
let c2s_bps = (sample.c2s_bytes as f64 * 8.0) / tick_secs;
let incoming_bps = (sample.s2c_requested_bytes as f64 * 8.0) / tick_secs;
let outgoing_bps = (sample.s2c_written_bytes as f64 * 8.0) / tick_secs;
let throughput = c2s_bps.max(outgoing_bps);
self.throughput_ema_bps = ema(self.throughput_ema_bps, throughput);
self.incoming_ema_bps = ema(self.incoming_ema_bps, incoming_bps);
self.outgoing_ema_bps = ema(self.outgoing_ema_bps, outgoing_bps);
let tier1_now = self.throughput_ema_bps >= THROUGHPUT_UP_BPS;
if tier1_now {
self.tier1_hold_ticks = self.tier1_hold_ticks.saturating_add(1);
} else {
self.tier1_hold_ticks = 0;
}
let ratio = if self.outgoing_ema_bps <= f64::EPSILON {
0.0
} else {
self.incoming_ema_bps / self.outgoing_ema_bps
};
let tier2_now = ratio >= RATIO_CONFIRM_THRESHOLD;
if tier2_now {
self.tier2_hold_ticks = self.tier2_hold_ticks.saturating_add(1);
} else {
self.tier2_hold_ticks = 0;
}
let partial_ratio = if sample.s2c_write_ops == 0 {
0.0
} else {
sample.s2c_partial_writes as f64 / sample.s2c_write_ops as f64
};
let hard_now = sample.s2c_consecutive_pending_writes >= HARD_PENDING_THRESHOLD
|| partial_ratio >= HARD_PARTIAL_RATIO_THRESHOLD;
if hard_now && self.hard_cooldown_ticks == 0 {
return self.promote(TierTransitionReason::HardPressure, HARD_COOLDOWN_TICKS);
}
if self.tier1_hold_ticks >= TIER1_HOLD_TICKS && self.tier2_hold_ticks >= TIER2_HOLD_TICKS {
return self.promote(TierTransitionReason::SoftConfirmed, 0);
}
let demote_candidate = self.throughput_ema_bps < THROUGHPUT_DOWN_BPS && !tier2_now && !hard_now;
if demote_candidate {
self.quiet_ticks = self.quiet_ticks.saturating_add(1);
if self.quiet_ticks >= QUIET_DEMOTE_TICKS {
self.quiet_ticks = 0;
return self.demote(TierTransitionReason::QuietDemotion);
}
} else {
self.quiet_ticks = 0;
}
None
}
fn promote(
&mut self,
reason: TierTransitionReason,
hard_cooldown_ticks: u32,
) -> Option<TierTransition> {
let from = self.tier;
let to = from.promote();
if from == to {
return None;
}
self.tier = to;
self.max_tier_seen = max(self.max_tier_seen, to);
self.hard_cooldown_ticks = hard_cooldown_ticks;
self.tier1_hold_ticks = 0;
self.tier2_hold_ticks = 0;
self.quiet_ticks = 0;
Some(TierTransition { from, to, reason })
}
fn demote(&mut self, reason: TierTransitionReason) -> Option<TierTransition> {
let from = self.tier;
let to = from.demote();
if from == to {
return None;
}
self.tier = to;
self.tier1_hold_ticks = 0;
self.tier2_hold_ticks = 0;
Some(TierTransition { from, to, reason })
}
}
#[derive(Debug, Clone, Copy)]
struct UserAdaptiveProfile {
tier: AdaptiveTier,
seen_at: Instant,
}
fn profiles() -> &'static DashMap<String, UserAdaptiveProfile> {
static USER_PROFILES: OnceLock<DashMap<String, UserAdaptiveProfile>> = OnceLock::new();
USER_PROFILES.get_or_init(DashMap::new)
}
pub fn seed_tier_for_user(user: &str) -> AdaptiveTier {
let now = Instant::now();
if let Some(entry) = profiles().get(user) {
let value = entry.value();
if now.duration_since(value.seen_at) <= PROFILE_TTL {
return value.tier;
}
}
AdaptiveTier::Base
}
pub fn record_user_tier(user: &str, tier: AdaptiveTier) {
let now = Instant::now();
if let Some(mut entry) = profiles().get_mut(user) {
let existing = *entry;
let effective = if now.duration_since(existing.seen_at) > PROFILE_TTL {
tier
} else {
max(existing.tier, tier)
};
*entry = UserAdaptiveProfile {
tier: effective,
seen_at: now,
};
return;
}
profiles().insert(
user.to_string(),
UserAdaptiveProfile { tier, seen_at: now },
);
}
pub fn direct_copy_buffers_for_tier(
tier: AdaptiveTier,
base_c2s: usize,
base_s2c: usize,
) -> (usize, usize) {
let (num, den) = tier.ratio();
(
scale(base_c2s, num, den, DIRECT_C2S_CAP_BYTES),
scale(base_s2c, num, den, DIRECT_S2C_CAP_BYTES),
)
}
pub fn me_flush_policy_for_tier(
tier: AdaptiveTier,
base_frames: usize,
base_bytes: usize,
base_delay: Duration,
) -> (usize, usize, Duration) {
let (num, den) = tier.ratio();
let frames = scale(base_frames, num, den, ME_FRAMES_CAP).max(1);
let bytes = scale(base_bytes, num, den, ME_BYTES_CAP).max(4096);
let delay_us = base_delay.as_micros() as u64;
let adjusted_delay_us = match tier {
AdaptiveTier::Base => delay_us,
AdaptiveTier::Tier1 => (delay_us.saturating_mul(7)).saturating_div(10),
AdaptiveTier::Tier2 => delay_us.saturating_div(2),
AdaptiveTier::Tier3 => (delay_us.saturating_mul(3)).saturating_div(10),
}
.max(ME_DELAY_MIN_US)
.min(delay_us.max(ME_DELAY_MIN_US));
(frames, bytes, Duration::from_micros(adjusted_delay_us))
}
fn ema(prev: f64, value: f64) -> f64 {
if prev <= f64::EPSILON {
value
} else {
(prev * (1.0 - EMA_ALPHA)) + (value * EMA_ALPHA)
}
}
fn scale(base: usize, numerator: usize, denominator: usize, cap: usize) -> usize {
let scaled = base
.saturating_mul(numerator)
.saturating_div(denominator.max(1));
scaled.min(cap).max(1)
}
#[cfg(test)]
mod tests {
use super::*;
fn sample(
c2s_bytes: u64,
s2c_requested_bytes: u64,
s2c_written_bytes: u64,
s2c_write_ops: u64,
s2c_partial_writes: u64,
s2c_consecutive_pending_writes: u32,
) -> RelaySignalSample {
RelaySignalSample {
c2s_bytes,
s2c_requested_bytes,
s2c_written_bytes,
s2c_write_ops,
s2c_partial_writes,
s2c_consecutive_pending_writes,
}
}
#[test]
fn test_soft_promotion_requires_tier1_and_tier2() {
let mut ctrl = SessionAdaptiveController::new(AdaptiveTier::Base);
let tick_secs = 0.25;
let mut promoted = None;
for _ in 0..8 {
promoted = ctrl.observe(
sample(
300_000, // ~9.6 Mbps
320_000, // incoming > outgoing to confirm tier2
250_000,
10,
0,
0,
),
tick_secs,
);
}
let transition = promoted.expect("expected soft promotion");
assert_eq!(transition.from, AdaptiveTier::Base);
assert_eq!(transition.to, AdaptiveTier::Tier1);
assert_eq!(transition.reason, TierTransitionReason::SoftConfirmed);
}
#[test]
fn test_hard_promotion_on_pending_pressure() {
let mut ctrl = SessionAdaptiveController::new(AdaptiveTier::Base);
let transition = ctrl
.observe(
sample(10_000, 20_000, 10_000, 4, 1, 3),
0.25,
)
.expect("expected hard promotion");
assert_eq!(transition.reason, TierTransitionReason::HardPressure);
assert_eq!(transition.to, AdaptiveTier::Tier1);
}
#[test]
fn test_quiet_demotion_is_slow_and_stepwise() {
let mut ctrl = SessionAdaptiveController::new(AdaptiveTier::Tier2);
let mut demotion = None;
for _ in 0..QUIET_DEMOTE_TICKS {
demotion = ctrl.observe(sample(1, 1, 1, 1, 0, 0), 0.25);
}
let transition = demotion.expect("expected quiet demotion");
assert_eq!(transition.from, AdaptiveTier::Tier2);
assert_eq!(transition.to, AdaptiveTier::Tier1);
assert_eq!(transition.reason, TierTransitionReason::QuietDemotion);
}
}

View File

@@ -84,6 +84,7 @@ use crate::proxy::handshake::{HandshakeSuccess, handle_mtproto_handshake, handle
use crate::proxy::masking::handle_bad_client;
use crate::proxy::middle_relay::handle_via_middle_proxy;
use crate::proxy::route_mode::{RelayRouteMode, RouteRuntimeController};
use crate::proxy::session_eviction::register_session;
fn beobachten_ttl(config: &ProxyConfig) -> Duration {
let minutes = config.general.beobachten_minutes;
@@ -866,6 +867,17 @@ impl RunningClientHandler {
}
};
let registration = register_session(&user, success.dc_idx);
if registration.replaced_existing {
stats.increment_reconnect_evict_total();
warn!(
user = %user,
dc = success.dc_idx,
"Reconnect detected: replacing active session for user+dc"
);
}
let session_lease = registration.lease;
let route_snapshot = route_runtime.snapshot();
let session_id = rng.u64();
let relay_result = if config.general.use_middle_proxy
@@ -885,6 +897,7 @@ impl RunningClientHandler {
route_runtime.subscribe(),
route_snapshot,
session_id,
session_lease.clone(),
)
.await
} else {
@@ -901,6 +914,7 @@ impl RunningClientHandler {
route_runtime.subscribe(),
route_snapshot,
session_id,
session_lease.clone(),
)
.await
}
@@ -918,6 +932,7 @@ impl RunningClientHandler {
route_runtime.subscribe(),
route_snapshot,
session_id,
session_lease.clone(),
)
.await
};

View File

@@ -22,6 +22,8 @@ use crate::proxy::route_mode::{
RelayRouteMode, RouteCutoverState, ROUTE_SWITCH_ERROR_MSG, affected_cutover_state,
cutover_stagger_delay,
};
use crate::proxy::adaptive_buffers;
use crate::proxy::session_eviction::SessionLease;
use crate::stats::Stats;
use crate::stream::{BufferPool, CryptoReader, CryptoWriter};
use crate::transport::UpstreamManager;
@@ -183,6 +185,7 @@ pub(crate) async fn handle_via_direct<R, W>(
mut route_rx: watch::Receiver<RouteCutoverState>,
route_snapshot: RouteCutoverState,
session_id: u64,
session_lease: SessionLease,
) -> Result<()>
where
R: AsyncRead + Unpin + Send + 'static,
@@ -222,17 +225,27 @@ where
stats.increment_user_connects(user);
let _direct_connection_lease = stats.acquire_direct_connection_lease();
let seed_tier = adaptive_buffers::seed_tier_for_user(user);
let (c2s_copy_buf, s2c_copy_buf) = adaptive_buffers::direct_copy_buffers_for_tier(
seed_tier,
config.general.direct_relay_copy_buf_c2s_bytes,
config.general.direct_relay_copy_buf_s2c_bytes,
);
let relay_result = relay_bidirectional(
client_reader,
client_writer,
tg_reader,
tg_writer,
config.general.direct_relay_copy_buf_c2s_bytes,
config.general.direct_relay_copy_buf_s2c_bytes,
c2s_copy_buf,
s2c_copy_buf,
user,
success.dc_idx,
Arc::clone(&stats),
config.access.user_data_quota.get(user).copied(),
buffer_pool,
session_lease,
seed_tier,
);
tokio::pin!(relay_result);
let relay_result = loop {

View File

@@ -1,5 +1,6 @@
//! Proxy Defs
pub mod adaptive_buffers;
pub mod client;
pub mod direct_relay;
pub mod handshake;
@@ -7,6 +8,7 @@ pub mod masking;
pub mod middle_relay;
pub mod route_mode;
pub mod relay;
pub mod session_eviction;
pub use client::ClientHandler;
#[allow(unused_imports)]

View File

@@ -0,0 +1,46 @@
/// Session eviction is intentionally disabled in runtime.
///
/// The initial `user+dc` single-lease model caused valid parallel client
/// connections to evict each other. Keep the API shape for compatibility,
/// but make it a no-op until a safer policy is introduced.
#[derive(Debug, Clone, Default)]
pub struct SessionLease;
impl SessionLease {
pub fn is_stale(&self) -> bool {
false
}
#[allow(dead_code)]
pub fn release(&self) {}
}
pub struct RegistrationResult {
pub lease: SessionLease,
pub replaced_existing: bool,
}
pub fn register_session(_user: &str, _dc_idx: i16) -> RegistrationResult {
RegistrationResult {
lease: SessionLease,
replaced_existing: false,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_session_eviction_disabled_behavior() {
let first = register_session("alice", 2);
let second = register_session("alice", 2);
assert!(!first.replaced_existing);
assert!(!second.replaced_existing);
assert!(!first.lease.is_stale());
assert!(!second.lease.is_stale());
first.lease.release();
second.lease.release();
}
}

View File

@@ -14,8 +14,7 @@ use std::sync::Arc;
// ============= Configuration =============
/// Default buffer size
/// CHANGED: Reduced from 64KB to 16KB to match TLS record size and prevent bufferbloat.
pub const DEFAULT_BUFFER_SIZE: usize = 16 * 1024;
pub const DEFAULT_BUFFER_SIZE: usize = 64 * 1024;
/// Default maximum number of pooled buffers
pub const DEFAULT_MAX_BUFFERS: usize = 1024;

View File

@@ -299,6 +299,11 @@ async fn run_update_cycle(
cfg.general.hardswap,
cfg.general.me_pool_drain_ttl_secs,
cfg.general.me_pool_drain_threshold,
cfg.general.me_pool_drain_soft_evict_enabled,
cfg.general.me_pool_drain_soft_evict_grace_secs,
cfg.general.me_pool_drain_soft_evict_per_writer,
cfg.general.me_pool_drain_soft_evict_budget_per_core,
cfg.general.me_pool_drain_soft_evict_cooldown_ms,
cfg.general.effective_me_pool_force_close_secs(),
cfg.general.me_pool_min_fresh_ratio,
cfg.general.me_hardswap_warmup_delay_min_ms,
@@ -526,6 +531,11 @@ pub async fn me_config_updater(
cfg.general.hardswap,
cfg.general.me_pool_drain_ttl_secs,
cfg.general.me_pool_drain_threshold,
cfg.general.me_pool_drain_soft_evict_enabled,
cfg.general.me_pool_drain_soft_evict_grace_secs,
cfg.general.me_pool_drain_soft_evict_per_writer,
cfg.general.me_pool_drain_soft_evict_budget_per_core,
cfg.general.me_pool_drain_soft_evict_cooldown_ms,
cfg.general.effective_me_pool_force_close_secs(),
cfg.general.me_pool_min_fresh_ratio,
cfg.general.me_hardswap_warmup_delay_min_ms,

View File

@@ -83,6 +83,11 @@ async fn make_pool(
general.hardswap,
general.me_pool_drain_ttl_secs,
general.me_pool_drain_threshold,
general.me_pool_drain_soft_evict_enabled,
general.me_pool_drain_soft_evict_grace_secs,
general.me_pool_drain_soft_evict_per_writer,
general.me_pool_drain_soft_evict_budget_per_core,
general.me_pool_drain_soft_evict_cooldown_ms,
general.effective_me_pool_force_close_secs(),
general.me_pool_min_fresh_ratio,
general.me_hardswap_warmup_delay_min_ms,
@@ -107,6 +112,8 @@ async fn make_pool(
general.me_warn_rate_limit_ms,
MeRouteNoWriterMode::default(),
general.me_route_no_writer_wait_ms,
general.me_route_hybrid_max_wait_ms,
general.me_route_blocking_send_timeout_ms,
general.me_route_inline_recovery_attempts,
general.me_route_inline_recovery_wait_ms,
);
@@ -220,10 +227,11 @@ async fn set_writer_runtime_state(
async fn reap_draining_writers_clears_warn_state_when_pool_empty() {
let (pool, _rng) = make_pool(128, 1, 1).await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
warn_next_allowed.insert(11, Instant::now() + Duration::from_secs(5));
warn_next_allowed.insert(22, Instant::now() + Duration::from_secs(5));
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert!(warn_next_allowed.is_empty());
}
@@ -232,6 +240,8 @@ async fn reap_draining_writers_clears_warn_state_when_pool_empty() {
async fn reap_draining_writers_respects_threshold_across_multiple_overflow_cycles() {
let threshold = 3u64;
let (pool, _rng) = make_pool(threshold, 1, 1).await;
pool.me_pool_drain_soft_evict_enabled
.store(false, Ordering::Relaxed);
let now_epoch_secs = MePool::now_epoch_secs();
for writer_id in 1..=60u64 {
@@ -246,8 +256,9 @@ async fn reap_draining_writers_respects_threshold_across_multiple_overflow_cycle
}
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
for _ in 0..64 {
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
if writer_count(&pool).await <= threshold as usize {
break;
}
@@ -275,11 +286,12 @@ async fn reap_draining_writers_handles_large_empty_writer_population() {
}
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
for _ in 0..24 {
if writer_count(&pool).await == 0 {
break;
}
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
}
assert_eq!(writer_count(&pool).await, 0);
@@ -303,11 +315,12 @@ async fn reap_draining_writers_processes_mass_deadline_expiry_without_unbounded_
}
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
for _ in 0..40 {
if writer_count(&pool).await == 0 {
break;
}
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
}
assert_eq!(writer_count(&pool).await, 0);
@@ -318,6 +331,7 @@ async fn reap_draining_writers_maintains_warn_state_subset_property_under_bulk_c
let (pool, _rng) = make_pool(128, 1, 1).await;
let now_epoch_secs = MePool::now_epoch_secs();
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
for wave in 0..40u64 {
for offset in 0..8u64 {
@@ -331,7 +345,7 @@ async fn reap_draining_writers_maintains_warn_state_subset_property_under_bulk_c
.await;
}
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert!(warn_next_allowed.len() <= writer_count(&pool).await);
let ids = sorted_writer_ids(&pool).await;
@@ -339,7 +353,7 @@ async fn reap_draining_writers_maintains_warn_state_subset_property_under_bulk_c
let _ = pool.remove_writer_and_close_clients(writer_id).await;
}
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert!(warn_next_allowed.len() <= writer_count(&pool).await);
}
}
@@ -361,9 +375,10 @@ async fn reap_draining_writers_budgeted_cleanup_never_increases_pool_size() {
}
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
let mut previous = writer_count(&pool).await;
for _ in 0..32 {
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
let current = writer_count(&pool).await;
assert!(current <= previous);
previous = current;

View File

@@ -81,6 +81,11 @@ async fn make_pool(
general.hardswap,
general.me_pool_drain_ttl_secs,
general.me_pool_drain_threshold,
general.me_pool_drain_soft_evict_enabled,
general.me_pool_drain_soft_evict_grace_secs,
general.me_pool_drain_soft_evict_per_writer,
general.me_pool_drain_soft_evict_budget_per_core,
general.me_pool_drain_soft_evict_cooldown_ms,
general.effective_me_pool_force_close_secs(),
general.me_pool_min_fresh_ratio,
general.me_hardswap_warmup_delay_min_ms,
@@ -105,6 +110,8 @@ async fn make_pool(
general.me_warn_rate_limit_ms,
MeRouteNoWriterMode::default(),
general.me_route_no_writer_wait_ms,
general.me_route_hybrid_max_wait_ms,
general.me_route_blocking_send_timeout_ms,
general.me_route_inline_recovery_attempts,
general.me_route_inline_recovery_wait_ms,
);

View File

@@ -4,6 +4,7 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, Ordering};
use std::time::{Duration, Instant};
use bytes::Bytes;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
@@ -39,7 +40,7 @@ async fn make_pool(me_pool_drain_threshold: u64) -> Arc<MePool> {
NetworkDecision::default(),
None,
Arc::new(SecureRandom::new()),
Arc::new(Stats::default()),
Arc::new(Stats::new()),
general.me_keepalive_enabled,
general.me_keepalive_interval_secs,
general.me_keepalive_jitter_secs,
@@ -74,6 +75,11 @@ async fn make_pool(me_pool_drain_threshold: u64) -> Arc<MePool> {
general.hardswap,
general.me_pool_drain_ttl_secs,
general.me_pool_drain_threshold,
general.me_pool_drain_soft_evict_enabled,
general.me_pool_drain_soft_evict_grace_secs,
general.me_pool_drain_soft_evict_per_writer,
general.me_pool_drain_soft_evict_budget_per_core,
general.me_pool_drain_soft_evict_cooldown_ms,
general.effective_me_pool_force_close_secs(),
general.me_pool_min_fresh_ratio,
general.me_hardswap_warmup_delay_min_ms,
@@ -98,6 +104,8 @@ async fn make_pool(me_pool_drain_threshold: u64) -> Arc<MePool> {
general.me_warn_rate_limit_ms,
MeRouteNoWriterMode::default(),
general.me_route_no_writer_wait_ms,
general.me_route_hybrid_max_wait_ms,
general.me_route_blocking_send_timeout_ms,
general.me_route_inline_recovery_attempts,
general.me_route_inline_recovery_wait_ms,
)
@@ -190,14 +198,15 @@ async fn reap_draining_writers_drops_warn_state_for_removed_writer() {
let conn_ids =
insert_draining_writer(&pool, 7, now_epoch_secs.saturating_sub(180), 1, 0).await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert!(warn_next_allowed.contains_key(&7));
let _ = pool.remove_writer_and_close_clients(7).await;
assert!(pool.registry.get_writer(conn_ids[0]).await.is_none());
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert!(!warn_next_allowed.contains_key(&7));
}
@@ -209,12 +218,96 @@ async fn reap_draining_writers_removes_empty_draining_writers() {
insert_draining_writer(&pool, 2, now_epoch_secs.saturating_sub(30), 0, 0).await;
insert_draining_writer(&pool, 3, now_epoch_secs.saturating_sub(20), 1, 0).await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert_eq!(current_writer_ids(&pool).await, vec![3]);
}
#[tokio::test]
async fn reap_draining_writers_does_not_block_on_stuck_writer_close_signal() {
let pool = make_pool(128).await;
let now_epoch_secs = MePool::now_epoch_secs();
let (blocked_tx, blocked_rx) = mpsc::channel::<WriterCommand>(1);
assert!(
blocked_tx
.try_send(WriterCommand::Data(Bytes::from_static(b"stuck")))
.is_ok()
);
let blocked_rx_guard = tokio::spawn(async move {
let _hold_rx = blocked_rx;
tokio::time::sleep(Duration::from_secs(30)).await;
});
let blocked_writer_id = 90u64;
let blocked_writer = MeWriter {
id: blocked_writer_id,
addr: SocketAddr::new(
IpAddr::V4(Ipv4Addr::LOCALHOST),
4500 + blocked_writer_id as u16,
),
source_ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
writer_dc: 2,
generation: 1,
contour: Arc::new(AtomicU8::new(WriterContour::Draining.as_u8())),
created_at: Instant::now() - Duration::from_secs(blocked_writer_id),
tx: blocked_tx.clone(),
cancel: CancellationToken::new(),
degraded: Arc::new(AtomicBool::new(false)),
rtt_ema_ms_x10: Arc::new(AtomicU32::new(0)),
draining: Arc::new(AtomicBool::new(true)),
draining_started_at_epoch_secs: Arc::new(AtomicU64::new(
now_epoch_secs.saturating_sub(120),
)),
drain_deadline_epoch_secs: Arc::new(AtomicU64::new(0)),
allow_drain_fallback: Arc::new(AtomicBool::new(false)),
};
pool.writers.write().await.push(blocked_writer);
pool.registry
.register_writer(blocked_writer_id, blocked_tx)
.await;
pool.conn_count.fetch_add(1, Ordering::Relaxed);
insert_draining_writer(&pool, 91, now_epoch_secs.saturating_sub(110), 0, 0).await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
let reap_res = tokio::time::timeout(
Duration::from_millis(500),
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed),
)
.await;
blocked_rx_guard.abort();
assert!(reap_res.is_ok(), "reap should not block on close signal");
assert!(current_writer_ids(&pool).await.is_empty());
assert_eq!(pool.stats.get_me_writer_close_signal_drop_total(), 2);
assert_eq!(pool.stats.get_me_writer_close_signal_channel_full_total(), 1);
assert_eq!(pool.stats.get_me_draining_writers_reap_progress_total(), 2);
let activity = pool.registry.writer_activity_snapshot().await;
assert!(!activity.bound_clients_by_writer.contains_key(&blocked_writer_id));
assert!(!activity.bound_clients_by_writer.contains_key(&91));
let (probe_conn_id, _rx) = pool.registry.register().await;
assert!(
!pool.registry
.bind_writer(
probe_conn_id,
blocked_writer_id,
ConnMeta {
target_dc: 2,
client_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6400),
our_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443),
proto_flags: 0,
},
)
.await
);
let _ = pool.registry.unregister(probe_conn_id).await;
}
#[tokio::test]
async fn reap_draining_writers_overflow_closes_oldest_non_empty_writers() {
let pool = make_pool(2).await;
@@ -224,8 +317,9 @@ async fn reap_draining_writers_overflow_closes_oldest_non_empty_writers() {
insert_draining_writer(&pool, 33, now_epoch_secs.saturating_sub(20), 1, 0).await;
insert_draining_writer(&pool, 44, now_epoch_secs.saturating_sub(10), 1, 0).await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert_eq!(current_writer_ids(&pool).await, vec![33, 44]);
}
@@ -243,8 +337,9 @@ async fn reap_draining_writers_deadline_force_close_applies_under_threshold() {
)
.await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert!(current_writer_ids(&pool).await.is_empty());
}
@@ -266,8 +361,9 @@ async fn reap_draining_writers_limits_closes_per_health_tick() {
.await;
}
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert_eq!(pool.writers.read().await.len(), writer_total - close_budget);
}
@@ -406,12 +502,13 @@ async fn reap_draining_writers_backlog_drains_across_ticks() {
.await;
}
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
for _ in 0..8 {
if pool.writers.read().await.is_empty() {
break;
}
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
}
assert!(pool.writers.read().await.is_empty());
@@ -435,9 +532,10 @@ async fn reap_draining_writers_threshold_backlog_converges_to_threshold() {
.await;
}
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
for _ in 0..16 {
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
if pool.writers.read().await.len() <= threshold as usize {
break;
}
@@ -454,8 +552,9 @@ async fn reap_draining_writers_threshold_zero_preserves_non_expired_non_empty_wr
insert_draining_writer(&pool, 20, now_epoch_secs.saturating_sub(30), 1, 0).await;
insert_draining_writer(&pool, 30, now_epoch_secs.saturating_sub(20), 1, 0).await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert_eq!(current_writer_ids(&pool).await, vec![10, 20, 30]);
}
@@ -478,8 +577,9 @@ async fn reap_draining_writers_prioritizes_force_close_before_empty_cleanup() {
let empty_writer_id = close_budget as u64 + 1;
insert_draining_writer(&pool, empty_writer_id, now_epoch_secs.saturating_sub(20), 0, 0).await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert_eq!(current_writer_ids(&pool).await, vec![empty_writer_id]);
}
@@ -491,8 +591,9 @@ async fn reap_draining_writers_empty_cleanup_does_not_increment_force_close_metr
insert_draining_writer(&pool, 1, now_epoch_secs.saturating_sub(60), 0, 0).await;
insert_draining_writer(&pool, 2, now_epoch_secs.saturating_sub(50), 0, 0).await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert!(current_writer_ids(&pool).await.is_empty());
assert_eq!(pool.stats.get_pool_force_close_total(), 0);
@@ -519,8 +620,9 @@ async fn reap_draining_writers_handles_duplicate_force_close_requests_for_same_w
)
.await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert!(current_writer_ids(&pool).await.is_empty());
}
@@ -530,6 +632,7 @@ async fn reap_draining_writers_warn_state_never_exceeds_live_draining_population
let pool = make_pool(128).await;
let now_epoch_secs = MePool::now_epoch_secs();
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
for wave in 0..12u64 {
for offset in 0..9u64 {
@@ -542,14 +645,14 @@ async fn reap_draining_writers_warn_state_never_exceeds_live_draining_population
)
.await;
}
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert!(warn_next_allowed.len() <= pool.writers.read().await.len());
let existing_writer_ids = current_writer_ids(&pool).await;
for writer_id in existing_writer_ids.into_iter().take(4) {
let _ = pool.remove_writer_and_close_clients(writer_id).await;
}
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert!(warn_next_allowed.len() <= pool.writers.read().await.len());
}
}
@@ -559,6 +662,7 @@ async fn reap_draining_writers_mixed_backlog_converges_without_leaking_warn_stat
let pool = make_pool(6).await;
let now_epoch_secs = MePool::now_epoch_secs();
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
for writer_id in 1..=18u64 {
let bound_clients = if writer_id % 3 == 0 { 0 } else { 1 };
@@ -578,7 +682,7 @@ async fn reap_draining_writers_mixed_backlog_converges_without_leaking_warn_stat
}
for _ in 0..16 {
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
if pool.writers.read().await.len() <= 6 {
break;
}
@@ -588,9 +692,62 @@ async fn reap_draining_writers_mixed_backlog_converges_without_leaking_warn_stat
assert!(warn_next_allowed.len() <= pool.writers.read().await.len());
}
#[tokio::test]
async fn reap_draining_writers_soft_evicts_stuck_writer_with_per_writer_cap() {
let pool = make_pool(128).await;
pool.me_pool_drain_soft_evict_enabled.store(true, Ordering::Relaxed);
pool.me_pool_drain_soft_evict_grace_secs.store(0, Ordering::Relaxed);
pool.me_pool_drain_soft_evict_per_writer.store(1, Ordering::Relaxed);
pool.me_pool_drain_soft_evict_budget_per_core.store(8, Ordering::Relaxed);
pool.me_pool_drain_soft_evict_cooldown_ms
.store(1, Ordering::Relaxed);
let now_epoch_secs = MePool::now_epoch_secs();
insert_draining_writer(&pool, 77, now_epoch_secs.saturating_sub(240), 3, 0).await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
let activity = pool.registry.writer_activity_snapshot().await;
assert_eq!(activity.bound_clients_by_writer.get(&77), Some(&2));
assert_eq!(pool.stats.get_pool_drain_soft_evict_total(), 1);
assert_eq!(pool.stats.get_pool_drain_soft_evict_writer_total(), 1);
assert_eq!(current_writer_ids(&pool).await, vec![77]);
}
#[tokio::test]
async fn reap_draining_writers_soft_evict_respects_cooldown_per_writer() {
let pool = make_pool(128).await;
pool.me_pool_drain_soft_evict_enabled.store(true, Ordering::Relaxed);
pool.me_pool_drain_soft_evict_grace_secs.store(0, Ordering::Relaxed);
pool.me_pool_drain_soft_evict_per_writer.store(1, Ordering::Relaxed);
pool.me_pool_drain_soft_evict_budget_per_core.store(8, Ordering::Relaxed);
pool.me_pool_drain_soft_evict_cooldown_ms
.store(60_000, Ordering::Relaxed);
let now_epoch_secs = MePool::now_epoch_secs();
insert_draining_writer(&pool, 88, now_epoch_secs.saturating_sub(240), 3, 0).await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
let activity = pool.registry.writer_activity_snapshot().await;
assert_eq!(activity.bound_clients_by_writer.get(&88), Some(&2));
assert_eq!(pool.stats.get_pool_drain_soft_evict_total(), 1);
assert_eq!(pool.stats.get_pool_drain_soft_evict_writer_total(), 1);
}
#[test]
fn general_config_default_drain_threshold_remains_enabled() {
assert_eq!(GeneralConfig::default().me_pool_drain_threshold, 128);
assert!(GeneralConfig::default().me_pool_drain_soft_evict_enabled);
assert_eq!(
GeneralConfig::default().me_pool_drain_soft_evict_per_writer,
1
);
}
#[tokio::test]

View File

@@ -70,10 +70,12 @@ impl MePool {
let mut missing_dc = Vec::<i32>::new();
let mut covered = 0usize;
let mut total = 0usize;
for (dc, endpoints) in desired_by_dc {
if endpoints.is_empty() {
continue;
}
total += 1;
if endpoints
.iter()
.any(|addr| active_writer_addrs.contains(&(*dc, *addr)))
@@ -85,7 +87,9 @@ impl MePool {
}
missing_dc.sort_unstable();
let total = desired_by_dc.len().max(1);
if total == 0 {
return (1.0, missing_dc);
}
let ratio = (covered as f32) / (total as f32);
(ratio, missing_dc)
}
@@ -431,29 +435,21 @@ impl MePool {
}
if hardswap {
let mut fresh_missing_dc = Vec::<(i32, usize, usize)>::new();
for (dc, endpoints) in &desired_by_dc {
if endpoints.is_empty() {
continue;
}
let required = self.required_writers_for_dc(endpoints.len());
let fresh_count = writers
.iter()
.filter(|w| !w.draining.load(Ordering::Relaxed))
.filter(|w| w.generation == generation)
.filter(|w| w.writer_dc == *dc)
.filter(|w| endpoints.contains(&w.addr))
.count();
if fresh_count < required {
fresh_missing_dc.push((*dc, fresh_count, required));
}
}
let fresh_writer_addrs: HashSet<(i32, SocketAddr)> = writers
.iter()
.filter(|w| !w.draining.load(Ordering::Relaxed))
.filter(|w| w.generation == generation)
.map(|w| (w.writer_dc, w.addr))
.collect();
let (fresh_coverage_ratio, fresh_missing_dc) =
Self::coverage_ratio(&desired_by_dc, &fresh_writer_addrs);
if !fresh_missing_dc.is_empty() {
warn!(
previous_generation,
generation,
fresh_coverage_ratio = format_args!("{fresh_coverage_ratio:.3}"),
missing_dc = ?fresh_missing_dc,
"ME hardswap pending: fresh generation coverage incomplete"
"ME hardswap pending: fresh generation DC coverage incomplete"
);
return;
}
@@ -541,3 +537,61 @@ impl MePool {
self.zero_downtime_reinit_after_map_change(rng).await;
}
}
#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use super::MePool;
fn addr(octet: u8, port: u16) -> SocketAddr {
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, octet)), port)
}
#[test]
fn coverage_ratio_counts_dc_coverage_not_floor() {
let dc1 = addr(1, 2001);
let dc2 = addr(2, 2002);
let mut desired_by_dc = HashMap::<i32, HashSet<SocketAddr>>::new();
desired_by_dc.insert(1, HashSet::from([dc1]));
desired_by_dc.insert(2, HashSet::from([dc2]));
let active_writer_addrs = HashSet::from([(1, dc1)]);
let (ratio, missing_dc) = MePool::coverage_ratio(&desired_by_dc, &active_writer_addrs);
assert_eq!(ratio, 0.5);
assert_eq!(missing_dc, vec![2]);
}
#[test]
fn coverage_ratio_ignores_empty_dc_groups() {
let dc1 = addr(1, 2001);
let mut desired_by_dc = HashMap::<i32, HashSet<SocketAddr>>::new();
desired_by_dc.insert(1, HashSet::from([dc1]));
desired_by_dc.insert(2, HashSet::new());
let active_writer_addrs = HashSet::from([(1, dc1)]);
let (ratio, missing_dc) = MePool::coverage_ratio(&desired_by_dc, &active_writer_addrs);
assert_eq!(ratio, 1.0);
assert!(missing_dc.is_empty());
}
#[test]
fn coverage_ratio_reports_missing_dcs_sorted() {
let dc1 = addr(1, 2001);
let dc2 = addr(2, 2002);
let mut desired_by_dc = HashMap::<i32, HashSet<SocketAddr>>::new();
desired_by_dc.insert(2, HashSet::from([dc2]));
desired_by_dc.insert(1, HashSet::from([dc1]));
let (ratio, missing_dc) = MePool::coverage_ratio(&desired_by_dc, &HashSet::new());
assert_eq!(ratio, 0.0);
assert_eq!(missing_dc, vec![1, 2]);
}
}

View File

@@ -40,6 +40,7 @@ pub(crate) struct MeApiDcStatusSnapshot {
pub floor_max: usize,
pub floor_capped: bool,
pub alive_writers: usize,
pub coverage_ratio: f64,
pub coverage_pct: f64,
pub fresh_alive_writers: usize,
pub fresh_coverage_pct: f64,
@@ -62,6 +63,7 @@ pub(crate) struct MeApiStatusSnapshot {
pub available_pct: f64,
pub required_writers: usize,
pub alive_writers: usize,
pub coverage_ratio: f64,
pub coverage_pct: f64,
pub fresh_alive_writers: usize,
pub fresh_coverage_pct: f64,
@@ -124,6 +126,11 @@ pub(crate) struct MeApiRuntimeSnapshot {
pub me_reconnect_backoff_cap_ms: u64,
pub me_reconnect_fast_retry_count: u32,
pub me_pool_drain_ttl_secs: u64,
pub me_pool_drain_soft_evict_enabled: bool,
pub me_pool_drain_soft_evict_grace_secs: u64,
pub me_pool_drain_soft_evict_per_writer: u8,
pub me_pool_drain_soft_evict_budget_per_core: u16,
pub me_pool_drain_soft_evict_cooldown_ms: u64,
pub me_pool_force_close_secs: u64,
pub me_pool_min_fresh_ratio: f32,
pub me_bind_stale_mode: &'static str,
@@ -337,6 +344,8 @@ impl MePool {
let mut available_endpoints = 0usize;
let mut alive_writers = 0usize;
let mut fresh_alive_writers = 0usize;
let mut coverage_ratio_dcs_total = 0usize;
let mut coverage_ratio_dcs_covered = 0usize;
let floor_mode = self.floor_mode();
let adaptive_cpu_cores = (self
.me_adaptive_floor_cpu_cores_effective
@@ -388,6 +397,12 @@ impl MePool {
available_endpoints += dc_available_endpoints;
alive_writers += dc_alive_writers;
fresh_alive_writers += dc_fresh_alive_writers;
if endpoint_count > 0 {
coverage_ratio_dcs_total += 1;
if dc_alive_writers > 0 {
coverage_ratio_dcs_covered += 1;
}
}
dcs.push(MeApiDcStatusSnapshot {
dc,
@@ -410,6 +425,11 @@ impl MePool {
floor_max,
floor_capped,
alive_writers: dc_alive_writers,
coverage_ratio: if endpoint_count > 0 && dc_alive_writers > 0 {
100.0
} else {
0.0
},
coverage_pct: ratio_pct(dc_alive_writers, dc_required_writers),
fresh_alive_writers: dc_fresh_alive_writers,
fresh_coverage_pct: ratio_pct(dc_fresh_alive_writers, dc_required_writers),
@@ -426,6 +446,7 @@ impl MePool {
available_pct: ratio_pct(available_endpoints, configured_endpoints),
required_writers,
alive_writers,
coverage_ratio: ratio_pct(coverage_ratio_dcs_covered, coverage_ratio_dcs_total),
coverage_pct: ratio_pct(alive_writers, required_writers),
fresh_alive_writers,
fresh_coverage_pct: ratio_pct(fresh_alive_writers, required_writers),
@@ -562,6 +583,22 @@ impl MePool {
me_reconnect_backoff_cap_ms: self.me_reconnect_backoff_cap.as_millis() as u64,
me_reconnect_fast_retry_count: self.me_reconnect_fast_retry_count,
me_pool_drain_ttl_secs: self.me_pool_drain_ttl_secs.load(Ordering::Relaxed),
me_pool_drain_soft_evict_enabled: self
.me_pool_drain_soft_evict_enabled
.load(Ordering::Relaxed),
me_pool_drain_soft_evict_grace_secs: self
.me_pool_drain_soft_evict_grace_secs
.load(Ordering::Relaxed),
me_pool_drain_soft_evict_per_writer: self
.me_pool_drain_soft_evict_per_writer
.load(Ordering::Relaxed),
me_pool_drain_soft_evict_budget_per_core: self
.me_pool_drain_soft_evict_budget_per_core
.load(Ordering::Relaxed)
.min(u16::MAX as u32) as u16,
me_pool_drain_soft_evict_cooldown_ms: self
.me_pool_drain_soft_evict_cooldown_ms
.load(Ordering::Relaxed),
me_pool_force_close_secs: self.me_pool_force_close_secs.load(Ordering::Relaxed),
me_pool_min_fresh_ratio: Self::permille_to_ratio(
self.me_pool_min_fresh_ratio_permille.load(Ordering::Relaxed),

View File

@@ -8,6 +8,7 @@ use bytes::Bytes;
use bytes::BytesMut;
use rand::Rng;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
@@ -311,41 +312,28 @@ impl MePool {
let mut p = Vec::with_capacity(12);
p.extend_from_slice(&RPC_PING_U32.to_le_bytes());
p.extend_from_slice(&sent_id.to_le_bytes());
{
let mut tracker = ping_tracker_ping.lock().await;
let now_epoch_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let mut run_cleanup = false;
if let Some(pool) = pool_ping.upgrade() {
let last_cleanup_ms = pool
let now_epoch_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let mut run_cleanup = false;
if let Some(pool) = pool_ping.upgrade() {
let last_cleanup_ms = pool
.ping_tracker_last_cleanup_epoch_ms
.load(Ordering::Relaxed);
if now_epoch_ms.saturating_sub(last_cleanup_ms) >= 30_000
&& pool
.ping_tracker_last_cleanup_epoch_ms
.load(Ordering::Relaxed);
if now_epoch_ms.saturating_sub(last_cleanup_ms) >= 30_000
&& pool
.ping_tracker_last_cleanup_epoch_ms
.compare_exchange(
last_cleanup_ms,
now_epoch_ms,
Ordering::AcqRel,
Ordering::Relaxed,
)
.is_ok()
{
run_cleanup = true;
}
.compare_exchange(
last_cleanup_ms,
now_epoch_ms,
Ordering::AcqRel,
Ordering::Relaxed,
)
.is_ok()
{
run_cleanup = true;
}
if run_cleanup {
let before = tracker.len();
tracker.retain(|_, (ts, _)| ts.elapsed() < Duration::from_secs(120));
let expired = before.saturating_sub(tracker.len());
if expired > 0 {
stats_ping.increment_me_keepalive_timeout_by(expired as u64);
}
}
tracker.insert(sent_id, (std::time::Instant::now(), writer_id));
}
ping_id = ping_id.wrapping_add(1);
stats_ping.increment_me_keepalive_sent();
@@ -366,6 +354,16 @@ impl MePool {
}
break;
}
let mut tracker = ping_tracker_ping.lock().await;
if run_cleanup {
let before = tracker.len();
tracker.retain(|_, (ts, _)| ts.elapsed() < Duration::from_secs(120));
let expired = before.saturating_sub(tracker.len());
if expired > 0 {
stats_ping.increment_me_keepalive_timeout_by(expired as u64);
}
}
tracker.insert(sent_id, (std::time::Instant::now(), writer_id));
}
});
@@ -493,11 +491,9 @@ impl MePool {
}
pub(crate) async fn remove_writer_and_close_clients(self: &Arc<Self>, writer_id: u64) {
let conns = self.remove_writer_only(writer_id).await;
for bound in conns {
let _ = self.registry.route(bound.conn_id, super::MeResponse::Close).await;
let _ = self.registry.unregister(bound.conn_id).await;
}
// Full client cleanup now happens inside `registry.writer_lost` to keep
// writer reap/remove paths strictly non-blocking per connection.
let _ = self.remove_writer_only(writer_id).await;
}
pub(crate) async fn remove_writer_if_empty(self: &Arc<Self>, writer_id: u64) -> bool {
@@ -539,6 +535,11 @@ impl MePool {
self.conn_count.fetch_sub(1, Ordering::Relaxed);
}
}
// State invariant:
// - writer is removed from `self.writers` (pool visibility),
// - writer is removed from registry routing/binding maps via `writer_lost`.
// The close command below is only a best-effort accelerator for task shutdown.
// Cleanup progress must never depend on command-channel availability.
let conns = self.registry.writer_lost(writer_id).await;
{
let mut tracker = self.ping_tracker.lock().await;
@@ -546,7 +547,25 @@ impl MePool {
}
self.rtt_stats.lock().await.remove(&writer_id);
if let Some(tx) = close_tx {
let _ = tx.send(WriterCommand::Close).await;
match tx.try_send(WriterCommand::Close) {
Ok(()) => {}
Err(TrySendError::Full(_)) => {
self.stats.increment_me_writer_close_signal_drop_total();
self.stats
.increment_me_writer_close_signal_channel_full_total();
debug!(
writer_id,
"Skipping close signal for removed writer: command channel is full"
);
}
Err(TrySendError::Closed(_)) => {
self.stats.increment_me_writer_close_signal_drop_total();
debug!(
writer_id,
"Skipping close signal for removed writer: command channel is closed"
);
}
}
}
if trigger_refill
&& let Some(addr) = removed_addr

View File

@@ -8,6 +8,7 @@ use bytes::{Bytes, BytesMut};
use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;
use tokio::sync::{Mutex, mpsc};
use tokio::sync::mpsc::error::TrySendError;
use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn};
@@ -173,12 +174,12 @@ pub(crate) async fn reader_loop(
} else if pt == RPC_CLOSE_EXT_U32 && body.len() >= 8 {
let cid = u64::from_le_bytes(body[0..8].try_into().unwrap());
debug!(cid, "RPC_CLOSE_EXT from ME");
reg.route(cid, MeResponse::Close).await;
let _ = reg.route_nowait(cid, MeResponse::Close).await;
reg.unregister(cid).await;
} else if pt == RPC_CLOSE_CONN_U32 && body.len() >= 8 {
let cid = u64::from_le_bytes(body[0..8].try_into().unwrap());
debug!(cid, "RPC_CLOSE_CONN from ME");
reg.route(cid, MeResponse::Close).await;
let _ = reg.route_nowait(cid, MeResponse::Close).await;
reg.unregister(cid).await;
} else if pt == RPC_PING_U32 && body.len() >= 8 {
let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap());
@@ -186,13 +187,15 @@ pub(crate) async fn reader_loop(
let mut pong = Vec::with_capacity(12);
pong.extend_from_slice(&RPC_PONG_U32.to_le_bytes());
pong.extend_from_slice(&ping_id.to_le_bytes());
if tx
.send(WriterCommand::DataAndFlush(Bytes::from(pong)))
.await
.is_err()
{
warn!("PONG send failed");
break;
match tx.try_send(WriterCommand::DataAndFlush(Bytes::from(pong))) {
Ok(()) => {}
Err(TrySendError::Full(_)) => {
debug!(ping_id, "PONG dropped: writer command channel is full");
}
Err(TrySendError::Closed(_)) => {
warn!("PONG send failed: writer channel closed");
break;
}
}
} else if pt == RPC_PONG_U32 && body.len() >= 8 {
let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap());
@@ -232,6 +235,13 @@ async fn send_close_conn(tx: &mpsc::Sender<WriterCommand>, conn_id: u64) {
let mut p = Vec::with_capacity(12);
p.extend_from_slice(&RPC_CLOSE_CONN_U32.to_le_bytes());
p.extend_from_slice(&conn_id.to_le_bytes());
let _ = tx.send(WriterCommand::DataAndFlush(Bytes::from(p))).await;
match tx.try_send(WriterCommand::DataAndFlush(Bytes::from(p))) {
Ok(()) => {}
Err(TrySendError::Full(_)) => {
debug!(conn_id, "ME close_conn signal skipped: writer command channel is full");
}
Err(TrySendError::Closed(_)) => {
debug!(conn_id, "ME close_conn signal skipped: writer command channel is closed");
}
}
}

View File

@@ -11,6 +11,8 @@ use tokio::net::TcpStream;
use socket2::{Socket, TcpKeepalive, Domain, Type, Protocol};
use tracing::debug;
const DEFAULT_SOCKET_BUFFER_BYTES: usize = 256 * 1024;
/// Configure TCP socket with recommended settings for proxy use
#[allow(dead_code)]
pub fn configure_tcp_socket(
@@ -34,10 +36,10 @@ pub fn configure_tcp_socket(
socket.set_tcp_keepalive(&keepalive)?;
}
// CHANGED: Removed manual buffer size setting (was 256KB).
// Allowing the OS kernel to handle TCP window scaling (Autotuning) is critical
// for mobile clients to avoid bufferbloat and stalled connections during uploads.
// Use explicit baseline buffers to reduce slow-start stalls on high RTT links.
socket.set_recv_buffer_size(DEFAULT_SOCKET_BUFFER_BYTES)?;
socket.set_send_buffer_size(DEFAULT_SOCKET_BUFFER_BYTES)?;
Ok(())
}
@@ -62,6 +64,10 @@ pub fn configure_client_socket(
let keepalive = keepalive.with_interval(Duration::from_secs(keepalive_secs));
socket.set_tcp_keepalive(&keepalive)?;
// Keep explicit baseline buffers for predictable throughput across busy hosts.
socket.set_recv_buffer_size(DEFAULT_SOCKET_BUFFER_BYTES)?;
socket.set_send_buffer_size(DEFAULT_SOCKET_BUFFER_BYTES)?;
// Set TCP user timeout (Linux only)
// NOTE: iOS does not support TCP_USER_TIMEOUT - application-level timeout
@@ -124,6 +130,8 @@ pub fn create_outgoing_socket_bound(addr: SocketAddr, bind_addr: Option<IpAddr>)
// Disable Nagle
socket.set_nodelay(true)?;
socket.set_recv_buffer_size(DEFAULT_SOCKET_BUFFER_BYTES)?;
socket.set_send_buffer_size(DEFAULT_SOCKET_BUFFER_BYTES)?;
if let Some(bind_ip) = bind_addr {
let bind_sock_addr = SocketAddr::new(bind_ip, 0);