From 89e5668c7e000ac7ffd5c3512280236f64862902 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Wed, 18 Mar 2026 22:33:41 +0300 Subject: [PATCH] Runtime guardrails Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/config/defaults.rs | 20 ++++ src/config/hot_reload.rs | 9 ++ src/config/load.rs | 24 +++++ src/config/types.rs | 23 +++++ src/maestro/helpers.rs | 1 + src/maestro/listeners.rs | 98 +++++++++++++++---- src/maestro/me_startup.rs | 2 + src/proxy/middle_relay.rs | 82 +++++++++++++--- src/transport/middle_proxy/health.rs | 2 + .../middle_proxy/health_adversarial_tests.rs | 2 + .../middle_proxy/health_integration_tests.rs | 2 + .../middle_proxy/health_regression_tests.rs | 2 + src/transport/middle_proxy/pool.rs | 8 ++ src/transport/middle_proxy/pool_writer.rs | 63 ++++++------ src/transport/middle_proxy/send.rs | 84 ++++++++++++++-- 15 files changed, 345 insertions(+), 77 deletions(-) diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 7b5b4a8..54a53b3 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -36,12 +36,16 @@ const DEFAULT_ME_HEALTH_INTERVAL_MS_UNHEALTHY: u64 = 1000; const DEFAULT_ME_HEALTH_INTERVAL_MS_HEALTHY: u64 = 3000; const DEFAULT_ME_ADMISSION_POLL_MS: u64 = 1000; const DEFAULT_ME_WARN_RATE_LIMIT_MS: u64 = 5000; +const DEFAULT_ME_ROUTE_HYBRID_MAX_WAIT_MS: u64 = 3000; +const DEFAULT_ME_ROUTE_BLOCKING_SEND_TIMEOUT_MS: u64 = 250; +const DEFAULT_ME_C2ME_SEND_TIMEOUT_MS: u64 = 4000; const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_ENABLED: bool = true; const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_GRACE_SECS: u64 = 30; const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_PER_WRITER: u8 = 1; const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_BUDGET_PER_CORE: u16 = 8; const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_COOLDOWN_MS: u64 = 5000; const DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS: u64 = 30; +const DEFAULT_ACCEPT_PERMIT_TIMEOUT_MS: u64 = 250; const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 2; const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 5; const DEFAULT_UPSTREAM_CONNECT_BUDGET_MS: u64 = 3000; @@ -156,6 +160,10 @@ pub(crate) fn default_server_max_connections() -> u32 { 10_000 } +pub(crate) fn default_accept_permit_timeout_ms() -> u64 { + DEFAULT_ACCEPT_PERMIT_TIMEOUT_MS +} + pub(crate) fn default_prefer_4() -> u8 { 4 } @@ -380,6 +388,18 @@ pub(crate) fn default_me_warn_rate_limit_ms() -> u64 { DEFAULT_ME_WARN_RATE_LIMIT_MS } +pub(crate) fn default_me_route_hybrid_max_wait_ms() -> u64 { + DEFAULT_ME_ROUTE_HYBRID_MAX_WAIT_MS +} + +pub(crate) fn default_me_route_blocking_send_timeout_ms() -> u64 { + DEFAULT_ME_ROUTE_BLOCKING_SEND_TIMEOUT_MS +} + +pub(crate) fn default_me_c2me_send_timeout_ms() -> u64 { + DEFAULT_ME_C2ME_SEND_TIMEOUT_MS +} + pub(crate) fn default_upstream_connect_retry_attempts() -> u32 { DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS } diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index fdf06fa..7b94999 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -612,6 +612,8 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b || old.server.listen_tcp != new.server.listen_tcp || old.server.listen_unix_sock != new.server.listen_unix_sock || old.server.listen_unix_sock_perm != new.server.listen_unix_sock_perm + || old.server.max_connections != new.server.max_connections + || old.server.accept_permit_timeout_ms != new.server.accept_permit_timeout_ms { warned = true; warn!("config reload: server listener settings changed; restart required"); @@ -671,6 +673,9 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b } if old.general.me_route_no_writer_mode != new.general.me_route_no_writer_mode || old.general.me_route_no_writer_wait_ms != new.general.me_route_no_writer_wait_ms + || old.general.me_route_hybrid_max_wait_ms != new.general.me_route_hybrid_max_wait_ms + || old.general.me_route_blocking_send_timeout_ms + != new.general.me_route_blocking_send_timeout_ms || old.general.me_route_inline_recovery_attempts != new.general.me_route_inline_recovery_attempts || old.general.me_route_inline_recovery_wait_ms @@ -679,6 +684,10 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b warned = true; warn!("config reload: general.me_route_no_writer_* changed; restart required"); } + if old.general.me_c2me_send_timeout_ms != new.general.me_c2me_send_timeout_ms { + warned = true; + warn!("config reload: general.me_c2me_send_timeout_ms changed; restart required"); + } if old.general.unknown_dc_log_path != new.general.unknown_dc_log_path || old.general.unknown_dc_file_log_enabled != new.general.unknown_dc_file_log_enabled { diff --git a/src/config/load.rs b/src/config/load.rs index 6fcbea3..0635f80 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -346,6 +346,12 @@ impl ProxyConfig { )); } + if config.general.me_c2me_send_timeout_ms > 60_000 { + return Err(ProxyError::Config( + "general.me_c2me_send_timeout_ms must be within [0, 60000]".to_string(), + )); + } + if config.general.me_reader_route_data_wait_ms > 20 { return Err(ProxyError::Config( "general.me_reader_route_data_wait_ms must be within [0, 20]".to_string(), @@ -627,6 +633,18 @@ impl ProxyConfig { )); } + if !(50..=60_000).contains(&config.general.me_route_hybrid_max_wait_ms) { + return Err(ProxyError::Config( + "general.me_route_hybrid_max_wait_ms must be within [50, 60000]".to_string(), + )); + } + + if config.general.me_route_blocking_send_timeout_ms > 5000 { + return Err(ProxyError::Config( + "general.me_route_blocking_send_timeout_ms must be within [0, 5000]".to_string(), + )); + } + if !(2..=4).contains(&config.general.me_writer_pick_sample_size) { return Err(ProxyError::Config( "general.me_writer_pick_sample_size must be within [2, 4]".to_string(), @@ -687,6 +705,12 @@ impl ProxyConfig { )); } + if config.server.accept_permit_timeout_ms > 60_000 { + return Err(ProxyError::Config( + "server.accept_permit_timeout_ms must be within [0, 60000]".to_string(), + )); + } + if config.general.effective_me_pool_force_close_secs() > 0 && config.general.effective_me_pool_force_close_secs() < config.general.me_pool_drain_ttl_secs diff --git a/src/config/types.rs b/src/config/types.rs index e507044..047f3c2 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -462,6 +462,11 @@ pub struct GeneralConfig { #[serde(default = "default_me_c2me_channel_capacity")] pub me_c2me_channel_capacity: usize, + /// Maximum wait in milliseconds for enqueueing C2ME commands when the queue is full. + /// `0` keeps legacy unbounded wait behavior. + #[serde(default = "default_me_c2me_send_timeout_ms")] + pub me_c2me_send_timeout_ms: u64, + /// Bounded wait in milliseconds for routing ME DATA to per-connection queue. /// `0` keeps legacy no-wait behavior. #[serde(default = "default_me_reader_route_data_wait_ms")] @@ -716,6 +721,15 @@ pub struct GeneralConfig { #[serde(default = "default_me_route_no_writer_wait_ms")] pub me_route_no_writer_wait_ms: u64, + /// Maximum cumulative wait in milliseconds for hybrid no-writer mode before failfast. + #[serde(default = "default_me_route_hybrid_max_wait_ms")] + pub me_route_hybrid_max_wait_ms: u64, + + /// Maximum wait in milliseconds for blocking ME writer channel send fallback. + /// `0` keeps legacy unbounded wait behavior. + #[serde(default = "default_me_route_blocking_send_timeout_ms")] + pub me_route_blocking_send_timeout_ms: u64, + /// Number of inline recovery attempts in legacy mode. #[serde(default = "default_me_route_inline_recovery_attempts")] pub me_route_inline_recovery_attempts: u32, @@ -921,6 +935,7 @@ impl Default for GeneralConfig { me_writer_cmd_channel_capacity: default_me_writer_cmd_channel_capacity(), me_route_channel_capacity: default_me_route_channel_capacity(), me_c2me_channel_capacity: default_me_c2me_channel_capacity(), + me_c2me_send_timeout_ms: default_me_c2me_send_timeout_ms(), me_reader_route_data_wait_ms: default_me_reader_route_data_wait_ms(), me_d2c_flush_batch_max_frames: default_me_d2c_flush_batch_max_frames(), me_d2c_flush_batch_max_bytes: default_me_d2c_flush_batch_max_bytes(), @@ -975,6 +990,8 @@ impl Default for GeneralConfig { me_warn_rate_limit_ms: default_me_warn_rate_limit_ms(), me_route_no_writer_mode: MeRouteNoWriterMode::default(), me_route_no_writer_wait_ms: default_me_route_no_writer_wait_ms(), + me_route_hybrid_max_wait_ms: default_me_route_hybrid_max_wait_ms(), + me_route_blocking_send_timeout_ms: default_me_route_blocking_send_timeout_ms(), me_route_inline_recovery_attempts: default_me_route_inline_recovery_attempts(), me_route_inline_recovery_wait_ms: default_me_route_inline_recovery_wait_ms(), links: LinksConfig::default(), @@ -1207,6 +1224,11 @@ pub struct ServerConfig { /// 0 means unlimited. #[serde(default = "default_server_max_connections")] pub max_connections: u32, + + /// Maximum wait in milliseconds while acquiring a connection slot permit. + /// `0` keeps legacy unbounded wait behavior. + #[serde(default = "default_accept_permit_timeout_ms")] + pub accept_permit_timeout_ms: u64, } impl Default for ServerConfig { @@ -1226,6 +1248,7 @@ impl Default for ServerConfig { api: ApiConfig::default(), listeners: Vec::new(), max_connections: default_server_max_connections(), + accept_permit_timeout_ms: default_accept_permit_timeout_ms(), } } } diff --git a/src/maestro/helpers.rs b/src/maestro/helpers.rs index 78f3ec4..f43e308 100644 --- a/src/maestro/helpers.rs +++ b/src/maestro/helpers.rs @@ -205,6 +205,7 @@ pub(crate) fn format_uptime(total_secs: u64) -> String { format!("{} / {} seconds", parts.join(", "), total_secs) } +#[allow(dead_code)] pub(crate) async fn wait_until_admission_open(admission_rx: &mut watch::Receiver) -> bool { loop { if *admission_rx.borrow() { diff --git a/src/maestro/listeners.rs b/src/maestro/listeners.rs index 6296fd7..fe041d9 100644 --- a/src/maestro/listeners.rs +++ b/src/maestro/listeners.rs @@ -24,7 +24,7 @@ use crate::transport::{ ListenOptions, UpstreamManager, create_listener, find_listener_processes, }; -use super::helpers::{is_expected_handshake_eof, print_proxy_links, wait_until_admission_open}; +use super::helpers::{is_expected_handshake_eof, print_proxy_links}; pub(crate) struct BoundListeners { pub(crate) listeners: Vec<(TcpListener, bool)>, @@ -195,7 +195,7 @@ pub(crate) async fn bind_listeners( has_unix_listener = true; let mut config_rx_unix: watch::Receiver> = config_rx.clone(); - let mut admission_rx_unix = admission_rx.clone(); + let admission_rx_unix = admission_rx.clone(); let stats = stats.clone(); let upstream_manager = upstream_manager.clone(); let replay_checker = replay_checker.clone(); @@ -212,17 +212,44 @@ pub(crate) async fn bind_listeners( let unix_conn_counter = Arc::new(std::sync::atomic::AtomicU64::new(1)); loop { - if !wait_until_admission_open(&mut admission_rx_unix).await { - warn!("Conditional-admission gate channel closed for unix listener"); - break; - } match unix_listener.accept().await { Ok((stream, _)) => { - let permit = match max_connections_unix.clone().acquire_owned().await { - Ok(permit) => permit, - Err(_) => { - error!("Connection limiter is closed"); - break; + if !*admission_rx_unix.borrow() { + drop(stream); + continue; + } + let accept_permit_timeout_ms = config_rx_unix + .borrow() + .server + .accept_permit_timeout_ms; + let permit = if accept_permit_timeout_ms == 0 { + match max_connections_unix.clone().acquire_owned().await { + Ok(permit) => permit, + Err(_) => { + error!("Connection limiter is closed"); + break; + } + } + } else { + match tokio::time::timeout( + Duration::from_millis(accept_permit_timeout_ms), + max_connections_unix.clone().acquire_owned(), + ) + .await + { + Ok(Ok(permit)) => permit, + Ok(Err(_)) => { + error!("Connection limiter is closed"); + break; + } + Err(_) => { + debug!( + timeout_ms = accept_permit_timeout_ms, + "Dropping accepted unix connection: permit wait timeout" + ); + drop(stream); + continue; + } } }; let conn_id = @@ -312,7 +339,7 @@ pub(crate) fn spawn_tcp_accept_loops( ) { for (listener, listener_proxy_protocol) in listeners { let mut config_rx: watch::Receiver> = config_rx.clone(); - let mut admission_rx_tcp = admission_rx.clone(); + let admission_rx_tcp = admission_rx.clone(); let stats = stats.clone(); let upstream_manager = upstream_manager.clone(); let replay_checker = replay_checker.clone(); @@ -327,17 +354,46 @@ pub(crate) fn spawn_tcp_accept_loops( tokio::spawn(async move { loop { - if !wait_until_admission_open(&mut admission_rx_tcp).await { - warn!("Conditional-admission gate channel closed for tcp listener"); - break; - } match listener.accept().await { Ok((stream, peer_addr)) => { - let permit = match max_connections_tcp.clone().acquire_owned().await { - Ok(permit) => permit, - Err(_) => { - error!("Connection limiter is closed"); - break; + if !*admission_rx_tcp.borrow() { + debug!(peer = %peer_addr, "Admission gate closed, dropping connection"); + drop(stream); + continue; + } + let accept_permit_timeout_ms = config_rx + .borrow() + .server + .accept_permit_timeout_ms; + let permit = if accept_permit_timeout_ms == 0 { + match max_connections_tcp.clone().acquire_owned().await { + Ok(permit) => permit, + Err(_) => { + error!("Connection limiter is closed"); + break; + } + } + } else { + match tokio::time::timeout( + Duration::from_millis(accept_permit_timeout_ms), + max_connections_tcp.clone().acquire_owned(), + ) + .await + { + Ok(Ok(permit)) => permit, + Ok(Err(_)) => { + error!("Connection limiter is closed"); + break; + } + Err(_) => { + debug!( + peer = %peer_addr, + timeout_ms = accept_permit_timeout_ms, + "Dropping accepted connection: permit wait timeout" + ); + drop(stream); + continue; + } } }; let config = config_rx.borrow_and_update().clone(); diff --git a/src/maestro/me_startup.rs b/src/maestro/me_startup.rs index 94ae884..827b00c 100644 --- a/src/maestro/me_startup.rs +++ b/src/maestro/me_startup.rs @@ -267,6 +267,8 @@ pub(crate) async fn initialize_me_pool( config.general.me_warn_rate_limit_ms, config.general.me_route_no_writer_mode, config.general.me_route_no_writer_wait_ms, + config.general.me_route_hybrid_max_wait_ms, + config.general.me_route_blocking_send_timeout_ms, config.general.me_route_inline_recovery_attempts, config.general.me_route_inline_recovery_wait_ms, ); diff --git a/src/proxy/middle_relay.rs b/src/proxy/middle_relay.rs index 4f70a17..102b06c 100644 --- a/src/proxy/middle_relay.rs +++ b/src/proxy/middle_relay.rs @@ -222,6 +222,7 @@ fn should_yield_c2me_sender(sent_since_yield: usize, has_backlog: bool) -> bool async fn enqueue_c2me_command( tx: &mpsc::Sender, cmd: C2MeCommand, + send_timeout: Duration, ) -> std::result::Result<(), mpsc::error::SendError> { match tx.try_send(cmd) { Ok(()) => Ok(()), @@ -231,7 +232,17 @@ async fn enqueue_c2me_command( if tx.capacity() <= C2ME_SOFT_PRESSURE_MIN_FREE_SLOTS { tokio::task::yield_now().await; } - tx.send(cmd).await + if send_timeout.is_zero() { + return tx.send(cmd).await; + } + match tokio::time::timeout(send_timeout, tx.reserve()).await { + Ok(Ok(permit)) => { + permit.send(cmd); + Ok(()) + } + Ok(Err(_)) => Err(mpsc::error::SendError(cmd)), + Err(_) => Err(mpsc::error::SendError(cmd)), + } } } } @@ -355,6 +366,7 @@ where .general .me_c2me_channel_capacity .max(C2ME_CHANNEL_CAPACITY_FALLBACK); + let c2me_send_timeout = Duration::from_millis(config.general.me_c2me_send_timeout_ms); let (c2me_tx, mut c2me_rx) = mpsc::channel::(c2me_channel_capacity); let me_pool_c2me = me_pool.clone(); let effective_tag = effective_tag; @@ -363,15 +375,42 @@ where while let Some(cmd) = c2me_rx.recv().await { match cmd { C2MeCommand::Data { payload, flags } => { - me_pool_c2me.send_proxy_req( - conn_id, - success.dc_idx, - peer, - translated_local_addr, - payload.as_ref(), - flags, - effective_tag.as_deref(), - ).await?; + if c2me_send_timeout.is_zero() { + me_pool_c2me + .send_proxy_req( + conn_id, + success.dc_idx, + peer, + translated_local_addr, + payload.as_ref(), + flags, + effective_tag.as_deref(), + ) + .await?; + } else { + match tokio::time::timeout( + c2me_send_timeout, + me_pool_c2me.send_proxy_req( + conn_id, + success.dc_idx, + peer, + translated_local_addr, + payload.as_ref(), + flags, + effective_tag.as_deref(), + ), + ) + .await + { + Ok(send_result) => send_result?, + Err(_) => { + return Err(ProxyError::Proxy(format!( + "ME send timeout after {}ms", + c2me_send_timeout.as_millis() + ))); + } + } + } sent_since_yield = sent_since_yield.saturating_add(1); if should_yield_c2me_sender(sent_since_yield, !c2me_rx.is_empty()) { sent_since_yield = 0; @@ -555,7 +594,7 @@ where loop { if session_lease.is_stale() { stats.increment_reconnect_stale_close_total(); - let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close).await; + let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close, c2me_send_timeout).await; main_result = Err(ProxyError::Proxy("Session evicted by reconnect".to_string())); break; } @@ -573,7 +612,7 @@ where "Cutover affected middle session, closing client connection" ); tokio::time::sleep(delay).await; - let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close).await; + let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close, c2me_send_timeout).await; main_result = Err(ProxyError::Proxy(ROUTE_SWITCH_ERROR_MSG.to_string())); break; } @@ -607,9 +646,13 @@ where flags |= RPC_FLAG_NOT_ENCRYPTED; } // Keep client read loop lightweight: route heavy ME send path via a dedicated task. - if enqueue_c2me_command(&c2me_tx, C2MeCommand::Data { payload, flags }) - .await - .is_err() + if enqueue_c2me_command( + &c2me_tx, + C2MeCommand::Data { payload, flags }, + c2me_send_timeout, + ) + .await + .is_err() { main_result = Err(ProxyError::Proxy("ME sender channel closed".into())); break; @@ -618,7 +661,12 @@ where Ok(None) => { debug!(conn_id, "Client EOF"); client_closed = true; - let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close).await; + let _ = enqueue_c2me_command( + &c2me_tx, + C2MeCommand::Close, + c2me_send_timeout, + ) + .await; break; } Err(e) => { @@ -993,6 +1041,7 @@ mod tests { payload: Bytes::from_static(&[1, 2, 3]), flags: 0, }, + TokioDuration::from_millis(50), ) .await .unwrap(); @@ -1028,6 +1077,7 @@ mod tests { payload: Bytes::from_static(&[7, 7]), flags: 7, }, + TokioDuration::from_millis(100), ) .await .unwrap(); diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 862e58a..0b9b749 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -1574,6 +1574,8 @@ mod tests { general.me_warn_rate_limit_ms, MeRouteNoWriterMode::default(), general.me_route_no_writer_wait_ms, + general.me_route_hybrid_max_wait_ms, + general.me_route_blocking_send_timeout_ms, general.me_route_inline_recovery_attempts, general.me_route_inline_recovery_wait_ms, ) diff --git a/src/transport/middle_proxy/health_adversarial_tests.rs b/src/transport/middle_proxy/health_adversarial_tests.rs index dc1a0eb..3f182e4 100644 --- a/src/transport/middle_proxy/health_adversarial_tests.rs +++ b/src/transport/middle_proxy/health_adversarial_tests.rs @@ -111,6 +111,8 @@ async fn make_pool( general.me_warn_rate_limit_ms, MeRouteNoWriterMode::default(), general.me_route_no_writer_wait_ms, + general.me_route_hybrid_max_wait_ms, + general.me_route_blocking_send_timeout_ms, general.me_route_inline_recovery_attempts, general.me_route_inline_recovery_wait_ms, ); diff --git a/src/transport/middle_proxy/health_integration_tests.rs b/src/transport/middle_proxy/health_integration_tests.rs index 4724851..7f99d2a 100644 --- a/src/transport/middle_proxy/health_integration_tests.rs +++ b/src/transport/middle_proxy/health_integration_tests.rs @@ -110,6 +110,8 @@ async fn make_pool( general.me_warn_rate_limit_ms, MeRouteNoWriterMode::default(), general.me_route_no_writer_wait_ms, + general.me_route_hybrid_max_wait_ms, + general.me_route_blocking_send_timeout_ms, general.me_route_inline_recovery_attempts, general.me_route_inline_recovery_wait_ms, ); diff --git a/src/transport/middle_proxy/health_regression_tests.rs b/src/transport/middle_proxy/health_regression_tests.rs index 45a1eee..606f7e5 100644 --- a/src/transport/middle_proxy/health_regression_tests.rs +++ b/src/transport/middle_proxy/health_regression_tests.rs @@ -103,6 +103,8 @@ async fn make_pool(me_pool_drain_threshold: u64) -> Arc { general.me_warn_rate_limit_ms, MeRouteNoWriterMode::default(), general.me_route_no_writer_wait_ms, + general.me_route_hybrid_max_wait_ms, + general.me_route_blocking_send_timeout_ms, general.me_route_inline_recovery_attempts, general.me_route_inline_recovery_wait_ms, ) diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index f3cc817..d09f07c 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -193,6 +193,8 @@ pub struct MePool { pub(super) me_reader_route_data_wait_ms: Arc, pub(super) me_route_no_writer_mode: AtomicU8, pub(super) me_route_no_writer_wait: Duration, + pub(super) me_route_hybrid_max_wait: Duration, + pub(super) me_route_blocking_send_timeout: Duration, pub(super) me_route_inline_recovery_attempts: u32, pub(super) me_route_inline_recovery_wait: Duration, pub(super) me_health_interval_ms_unhealthy: AtomicU64, @@ -307,6 +309,8 @@ impl MePool { me_warn_rate_limit_ms: u64, me_route_no_writer_mode: MeRouteNoWriterMode, me_route_no_writer_wait_ms: u64, + me_route_hybrid_max_wait_ms: u64, + me_route_blocking_send_timeout_ms: u64, me_route_inline_recovery_attempts: u32, me_route_inline_recovery_wait_ms: u64, ) -> Arc { @@ -490,6 +494,10 @@ impl MePool { me_reader_route_data_wait_ms: Arc::new(AtomicU64::new(me_reader_route_data_wait_ms)), me_route_no_writer_mode: AtomicU8::new(me_route_no_writer_mode.as_u8()), me_route_no_writer_wait: Duration::from_millis(me_route_no_writer_wait_ms), + me_route_hybrid_max_wait: Duration::from_millis(me_route_hybrid_max_wait_ms), + me_route_blocking_send_timeout: Duration::from_millis( + me_route_blocking_send_timeout_ms, + ), me_route_inline_recovery_attempts, me_route_inline_recovery_wait: Duration::from_millis(me_route_inline_recovery_wait_ms), me_health_interval_ms_unhealthy: AtomicU64::new(me_health_interval_ms_unhealthy.max(1)), diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index 8ce3de3..4035111 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -312,41 +312,28 @@ impl MePool { let mut p = Vec::with_capacity(12); p.extend_from_slice(&RPC_PING_U32.to_le_bytes()); p.extend_from_slice(&sent_id.to_le_bytes()); - { - let mut tracker = ping_tracker_ping.lock().await; - let now_epoch_ms = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_millis() as u64; - let mut run_cleanup = false; - if let Some(pool) = pool_ping.upgrade() { - let last_cleanup_ms = pool + let now_epoch_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + let mut run_cleanup = false; + if let Some(pool) = pool_ping.upgrade() { + let last_cleanup_ms = pool + .ping_tracker_last_cleanup_epoch_ms + .load(Ordering::Relaxed); + if now_epoch_ms.saturating_sub(last_cleanup_ms) >= 30_000 + && pool .ping_tracker_last_cleanup_epoch_ms - .load(Ordering::Relaxed); - if now_epoch_ms.saturating_sub(last_cleanup_ms) >= 30_000 - && pool - .ping_tracker_last_cleanup_epoch_ms - .compare_exchange( - last_cleanup_ms, - now_epoch_ms, - Ordering::AcqRel, - Ordering::Relaxed, - ) - .is_ok() - { - run_cleanup = true; - } + .compare_exchange( + last_cleanup_ms, + now_epoch_ms, + Ordering::AcqRel, + Ordering::Relaxed, + ) + .is_ok() + { + run_cleanup = true; } - - if run_cleanup { - let before = tracker.len(); - tracker.retain(|_, (ts, _)| ts.elapsed() < Duration::from_secs(120)); - let expired = before.saturating_sub(tracker.len()); - if expired > 0 { - stats_ping.increment_me_keepalive_timeout_by(expired as u64); - } - } - tracker.insert(sent_id, (std::time::Instant::now(), writer_id)); } ping_id = ping_id.wrapping_add(1); stats_ping.increment_me_keepalive_sent(); @@ -367,6 +354,16 @@ impl MePool { } break; } + let mut tracker = ping_tracker_ping.lock().await; + if run_cleanup { + let before = tracker.len(); + tracker.retain(|_, (ts, _)| ts.elapsed() < Duration::from_secs(120)); + let expired = before.saturating_sub(tracker.len()); + if expired > 0 { + stats_ping.increment_me_keepalive_timeout_by(expired as u64); + } + } + tracker.insert(sent_id, (std::time::Instant::now(), writer_id)); } }); diff --git a/src/transport/middle_proxy/send.rs b/src/transport/middle_proxy/send.rs index 0f9fed6..1c255ef 100644 --- a/src/transport/middle_proxy/send.rs +++ b/src/transport/middle_proxy/send.rs @@ -6,6 +6,7 @@ use std::sync::atomic::Ordering; use std::time::{Duration, Instant}; use bytes::Bytes; +use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; use tracing::{debug, warn}; @@ -29,6 +30,29 @@ const PICK_PENALTY_DRAINING: u64 = 600; const PICK_PENALTY_STALE: u64 = 300; const PICK_PENALTY_DEGRADED: u64 = 250; +enum TimedSendError { + Closed(T), + Timeout(T), +} + +async fn send_writer_command_with_timeout( + tx: &mpsc::Sender, + cmd: WriterCommand, + timeout: Duration, +) -> std::result::Result<(), TimedSendError> { + if timeout.is_zero() { + return tx.send(cmd).await.map_err(|err| TimedSendError::Closed(err.0)); + } + match tokio::time::timeout(timeout, tx.reserve()).await { + Ok(Ok(permit)) => { + permit.send(cmd); + Ok(()) + } + Ok(Err(_)) => Err(TimedSendError::Closed(cmd)), + Err(_) => Err(TimedSendError::Timeout(cmd)), + } +} + impl MePool { /// Send RPC_PROXY_REQ. `tag_override`: per-user ad_tag (from access.user_ad_tags); if None, uses pool default. pub async fn send_proxy_req( @@ -78,8 +102,18 @@ impl MePool { let mut hybrid_last_recovery_at: Option = None; let hybrid_wait_step = self.me_route_no_writer_wait.max(Duration::from_millis(50)); let mut hybrid_wait_current = hybrid_wait_step; + let hybrid_deadline = Instant::now() + self.me_route_hybrid_max_wait; loop { + if matches!(no_writer_mode, MeRouteNoWriterMode::HybridAsyncPersistent) + && Instant::now() >= hybrid_deadline + { + self.stats.increment_me_no_writer_failfast_total(); + return Err(ProxyError::Proxy( + "No ME writer available in hybrid wait window".into(), + )); + } + let mut skip_writer_id: Option = None; let current_meta = self .registry .get_meta(conn_id) @@ -90,12 +124,30 @@ impl MePool { match current.tx.try_send(WriterCommand::Data(current_payload.clone())) { Ok(()) => return Ok(()), Err(TrySendError::Full(cmd)) => { - if current.tx.send(cmd).await.is_ok() { - return Ok(()); + match send_writer_command_with_timeout( + ¤t.tx, + cmd, + self.me_route_blocking_send_timeout, + ) + .await + { + Ok(()) => return Ok(()), + Err(TimedSendError::Closed(_)) => { + warn!(writer_id = current.writer_id, "ME writer channel closed"); + self.remove_writer_and_close_clients(current.writer_id).await; + continue; + } + Err(TimedSendError::Timeout(_)) => { + debug!( + conn_id, + writer_id = current.writer_id, + timeout_ms = self.me_route_blocking_send_timeout.as_millis() + as u64, + "ME writer send timed out for bound writer, trying reroute" + ); + skip_writer_id = Some(current.writer_id); + } } - warn!(writer_id = current.writer_id, "ME writer channel closed"); - self.remove_writer_and_close_clients(current.writer_id).await; - continue; } Err(TrySendError::Closed(_)) => { warn!(writer_id = current.writer_id, "ME writer channel closed"); @@ -200,6 +252,9 @@ impl MePool { .candidate_indices_for_dc(&writers_snapshot, routed_dc, true) .await; } + if let Some(skip_writer_id) = skip_writer_id { + candidate_indices.retain(|idx| writers_snapshot[*idx].id != skip_writer_id); + } if candidate_indices.is_empty() { let pick_mode = self.writer_pick_mode(); match no_writer_mode { @@ -422,7 +477,13 @@ impl MePool { self.stats.increment_me_writer_pick_blocking_fallback_total(); let effective_our_addr = SocketAddr::new(w.source_ip, our_addr.port()); let (payload, meta) = build_routed_payload(effective_our_addr); - match w.tx.send(WriterCommand::Data(payload.clone())).await { + match send_writer_command_with_timeout( + &w.tx, + WriterCommand::Data(payload.clone()), + self.me_route_blocking_send_timeout, + ) + .await + { Ok(()) => { self.stats .increment_me_writer_pick_success_fallback_total(pick_mode); @@ -439,11 +500,20 @@ impl MePool { } return Ok(()); } - Err(_) => { + Err(TimedSendError::Closed(_)) => { self.stats.increment_me_writer_pick_closed_total(pick_mode); warn!(writer_id = w.id, "ME writer channel closed (blocking)"); self.remove_writer_and_close_clients(w.id).await; } + Err(TimedSendError::Timeout(_)) => { + self.stats.increment_me_writer_pick_full_total(pick_mode); + debug!( + conn_id, + writer_id = w.id, + timeout_ms = self.me_route_blocking_send_timeout.as_millis() as u64, + "ME writer blocking fallback send timed out" + ); + } } } }