Runtime guardrails

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey 2026-03-18 22:33:41 +03:00
parent 1544e3fcff
commit 89e5668c7e
No known key found for this signature in database
15 changed files with 345 additions and 77 deletions

View File

@ -36,12 +36,16 @@ const DEFAULT_ME_HEALTH_INTERVAL_MS_UNHEALTHY: u64 = 1000;
const DEFAULT_ME_HEALTH_INTERVAL_MS_HEALTHY: u64 = 3000; const DEFAULT_ME_HEALTH_INTERVAL_MS_HEALTHY: u64 = 3000;
const DEFAULT_ME_ADMISSION_POLL_MS: u64 = 1000; const DEFAULT_ME_ADMISSION_POLL_MS: u64 = 1000;
const DEFAULT_ME_WARN_RATE_LIMIT_MS: u64 = 5000; const DEFAULT_ME_WARN_RATE_LIMIT_MS: u64 = 5000;
const DEFAULT_ME_ROUTE_HYBRID_MAX_WAIT_MS: u64 = 3000;
const DEFAULT_ME_ROUTE_BLOCKING_SEND_TIMEOUT_MS: u64 = 250;
const DEFAULT_ME_C2ME_SEND_TIMEOUT_MS: u64 = 4000;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_ENABLED: bool = true; const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_ENABLED: bool = true;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_GRACE_SECS: u64 = 30; const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_GRACE_SECS: u64 = 30;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_PER_WRITER: u8 = 1; const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_PER_WRITER: u8 = 1;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_BUDGET_PER_CORE: u16 = 8; const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_BUDGET_PER_CORE: u16 = 8;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_COOLDOWN_MS: u64 = 5000; const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_COOLDOWN_MS: u64 = 5000;
const DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS: u64 = 30; const DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS: u64 = 30;
const DEFAULT_ACCEPT_PERMIT_TIMEOUT_MS: u64 = 250;
const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 2; const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 2;
const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 5; const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 5;
const DEFAULT_UPSTREAM_CONNECT_BUDGET_MS: u64 = 3000; const DEFAULT_UPSTREAM_CONNECT_BUDGET_MS: u64 = 3000;
@ -156,6 +160,10 @@ pub(crate) fn default_server_max_connections() -> u32 {
10_000 10_000
} }
pub(crate) fn default_accept_permit_timeout_ms() -> u64 {
DEFAULT_ACCEPT_PERMIT_TIMEOUT_MS
}
pub(crate) fn default_prefer_4() -> u8 { pub(crate) fn default_prefer_4() -> u8 {
4 4
} }
@ -380,6 +388,18 @@ pub(crate) fn default_me_warn_rate_limit_ms() -> u64 {
DEFAULT_ME_WARN_RATE_LIMIT_MS DEFAULT_ME_WARN_RATE_LIMIT_MS
} }
pub(crate) fn default_me_route_hybrid_max_wait_ms() -> u64 {
DEFAULT_ME_ROUTE_HYBRID_MAX_WAIT_MS
}
pub(crate) fn default_me_route_blocking_send_timeout_ms() -> u64 {
DEFAULT_ME_ROUTE_BLOCKING_SEND_TIMEOUT_MS
}
pub(crate) fn default_me_c2me_send_timeout_ms() -> u64 {
DEFAULT_ME_C2ME_SEND_TIMEOUT_MS
}
pub(crate) fn default_upstream_connect_retry_attempts() -> u32 { pub(crate) fn default_upstream_connect_retry_attempts() -> u32 {
DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS
} }

View File

@ -612,6 +612,8 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b
|| old.server.listen_tcp != new.server.listen_tcp || old.server.listen_tcp != new.server.listen_tcp
|| old.server.listen_unix_sock != new.server.listen_unix_sock || old.server.listen_unix_sock != new.server.listen_unix_sock
|| old.server.listen_unix_sock_perm != new.server.listen_unix_sock_perm || old.server.listen_unix_sock_perm != new.server.listen_unix_sock_perm
|| old.server.max_connections != new.server.max_connections
|| old.server.accept_permit_timeout_ms != new.server.accept_permit_timeout_ms
{ {
warned = true; warned = true;
warn!("config reload: server listener settings changed; restart required"); warn!("config reload: server listener settings changed; restart required");
@ -671,6 +673,9 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b
} }
if old.general.me_route_no_writer_mode != new.general.me_route_no_writer_mode if old.general.me_route_no_writer_mode != new.general.me_route_no_writer_mode
|| old.general.me_route_no_writer_wait_ms != new.general.me_route_no_writer_wait_ms || old.general.me_route_no_writer_wait_ms != new.general.me_route_no_writer_wait_ms
|| old.general.me_route_hybrid_max_wait_ms != new.general.me_route_hybrid_max_wait_ms
|| old.general.me_route_blocking_send_timeout_ms
!= new.general.me_route_blocking_send_timeout_ms
|| old.general.me_route_inline_recovery_attempts || old.general.me_route_inline_recovery_attempts
!= new.general.me_route_inline_recovery_attempts != new.general.me_route_inline_recovery_attempts
|| old.general.me_route_inline_recovery_wait_ms || old.general.me_route_inline_recovery_wait_ms
@ -679,6 +684,10 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b
warned = true; warned = true;
warn!("config reload: general.me_route_no_writer_* changed; restart required"); warn!("config reload: general.me_route_no_writer_* changed; restart required");
} }
if old.general.me_c2me_send_timeout_ms != new.general.me_c2me_send_timeout_ms {
warned = true;
warn!("config reload: general.me_c2me_send_timeout_ms changed; restart required");
}
if old.general.unknown_dc_log_path != new.general.unknown_dc_log_path if old.general.unknown_dc_log_path != new.general.unknown_dc_log_path
|| old.general.unknown_dc_file_log_enabled != new.general.unknown_dc_file_log_enabled || old.general.unknown_dc_file_log_enabled != new.general.unknown_dc_file_log_enabled
{ {

View File

@ -346,6 +346,12 @@ impl ProxyConfig {
)); ));
} }
if config.general.me_c2me_send_timeout_ms > 60_000 {
return Err(ProxyError::Config(
"general.me_c2me_send_timeout_ms must be within [0, 60000]".to_string(),
));
}
if config.general.me_reader_route_data_wait_ms > 20 { if config.general.me_reader_route_data_wait_ms > 20 {
return Err(ProxyError::Config( return Err(ProxyError::Config(
"general.me_reader_route_data_wait_ms must be within [0, 20]".to_string(), "general.me_reader_route_data_wait_ms must be within [0, 20]".to_string(),
@ -627,6 +633,18 @@ impl ProxyConfig {
)); ));
} }
if !(50..=60_000).contains(&config.general.me_route_hybrid_max_wait_ms) {
return Err(ProxyError::Config(
"general.me_route_hybrid_max_wait_ms must be within [50, 60000]".to_string(),
));
}
if config.general.me_route_blocking_send_timeout_ms > 5000 {
return Err(ProxyError::Config(
"general.me_route_blocking_send_timeout_ms must be within [0, 5000]".to_string(),
));
}
if !(2..=4).contains(&config.general.me_writer_pick_sample_size) { if !(2..=4).contains(&config.general.me_writer_pick_sample_size) {
return Err(ProxyError::Config( return Err(ProxyError::Config(
"general.me_writer_pick_sample_size must be within [2, 4]".to_string(), "general.me_writer_pick_sample_size must be within [2, 4]".to_string(),
@ -687,6 +705,12 @@ impl ProxyConfig {
)); ));
} }
if config.server.accept_permit_timeout_ms > 60_000 {
return Err(ProxyError::Config(
"server.accept_permit_timeout_ms must be within [0, 60000]".to_string(),
));
}
if config.general.effective_me_pool_force_close_secs() > 0 if config.general.effective_me_pool_force_close_secs() > 0
&& config.general.effective_me_pool_force_close_secs() && config.general.effective_me_pool_force_close_secs()
< config.general.me_pool_drain_ttl_secs < config.general.me_pool_drain_ttl_secs

View File

@ -462,6 +462,11 @@ pub struct GeneralConfig {
#[serde(default = "default_me_c2me_channel_capacity")] #[serde(default = "default_me_c2me_channel_capacity")]
pub me_c2me_channel_capacity: usize, pub me_c2me_channel_capacity: usize,
/// Maximum wait in milliseconds for enqueueing C2ME commands when the queue is full.
/// `0` keeps legacy unbounded wait behavior.
#[serde(default = "default_me_c2me_send_timeout_ms")]
pub me_c2me_send_timeout_ms: u64,
/// Bounded wait in milliseconds for routing ME DATA to per-connection queue. /// Bounded wait in milliseconds for routing ME DATA to per-connection queue.
/// `0` keeps legacy no-wait behavior. /// `0` keeps legacy no-wait behavior.
#[serde(default = "default_me_reader_route_data_wait_ms")] #[serde(default = "default_me_reader_route_data_wait_ms")]
@ -716,6 +721,15 @@ pub struct GeneralConfig {
#[serde(default = "default_me_route_no_writer_wait_ms")] #[serde(default = "default_me_route_no_writer_wait_ms")]
pub me_route_no_writer_wait_ms: u64, pub me_route_no_writer_wait_ms: u64,
/// Maximum cumulative wait in milliseconds for hybrid no-writer mode before failfast.
#[serde(default = "default_me_route_hybrid_max_wait_ms")]
pub me_route_hybrid_max_wait_ms: u64,
/// Maximum wait in milliseconds for blocking ME writer channel send fallback.
/// `0` keeps legacy unbounded wait behavior.
#[serde(default = "default_me_route_blocking_send_timeout_ms")]
pub me_route_blocking_send_timeout_ms: u64,
/// Number of inline recovery attempts in legacy mode. /// Number of inline recovery attempts in legacy mode.
#[serde(default = "default_me_route_inline_recovery_attempts")] #[serde(default = "default_me_route_inline_recovery_attempts")]
pub me_route_inline_recovery_attempts: u32, pub me_route_inline_recovery_attempts: u32,
@ -921,6 +935,7 @@ impl Default for GeneralConfig {
me_writer_cmd_channel_capacity: default_me_writer_cmd_channel_capacity(), me_writer_cmd_channel_capacity: default_me_writer_cmd_channel_capacity(),
me_route_channel_capacity: default_me_route_channel_capacity(), me_route_channel_capacity: default_me_route_channel_capacity(),
me_c2me_channel_capacity: default_me_c2me_channel_capacity(), me_c2me_channel_capacity: default_me_c2me_channel_capacity(),
me_c2me_send_timeout_ms: default_me_c2me_send_timeout_ms(),
me_reader_route_data_wait_ms: default_me_reader_route_data_wait_ms(), me_reader_route_data_wait_ms: default_me_reader_route_data_wait_ms(),
me_d2c_flush_batch_max_frames: default_me_d2c_flush_batch_max_frames(), me_d2c_flush_batch_max_frames: default_me_d2c_flush_batch_max_frames(),
me_d2c_flush_batch_max_bytes: default_me_d2c_flush_batch_max_bytes(), me_d2c_flush_batch_max_bytes: default_me_d2c_flush_batch_max_bytes(),
@ -975,6 +990,8 @@ impl Default for GeneralConfig {
me_warn_rate_limit_ms: default_me_warn_rate_limit_ms(), me_warn_rate_limit_ms: default_me_warn_rate_limit_ms(),
me_route_no_writer_mode: MeRouteNoWriterMode::default(), me_route_no_writer_mode: MeRouteNoWriterMode::default(),
me_route_no_writer_wait_ms: default_me_route_no_writer_wait_ms(), me_route_no_writer_wait_ms: default_me_route_no_writer_wait_ms(),
me_route_hybrid_max_wait_ms: default_me_route_hybrid_max_wait_ms(),
me_route_blocking_send_timeout_ms: default_me_route_blocking_send_timeout_ms(),
me_route_inline_recovery_attempts: default_me_route_inline_recovery_attempts(), me_route_inline_recovery_attempts: default_me_route_inline_recovery_attempts(),
me_route_inline_recovery_wait_ms: default_me_route_inline_recovery_wait_ms(), me_route_inline_recovery_wait_ms: default_me_route_inline_recovery_wait_ms(),
links: LinksConfig::default(), links: LinksConfig::default(),
@ -1207,6 +1224,11 @@ pub struct ServerConfig {
/// 0 means unlimited. /// 0 means unlimited.
#[serde(default = "default_server_max_connections")] #[serde(default = "default_server_max_connections")]
pub max_connections: u32, pub max_connections: u32,
/// Maximum wait in milliseconds while acquiring a connection slot permit.
/// `0` keeps legacy unbounded wait behavior.
#[serde(default = "default_accept_permit_timeout_ms")]
pub accept_permit_timeout_ms: u64,
} }
impl Default for ServerConfig { impl Default for ServerConfig {
@ -1226,6 +1248,7 @@ impl Default for ServerConfig {
api: ApiConfig::default(), api: ApiConfig::default(),
listeners: Vec::new(), listeners: Vec::new(),
max_connections: default_server_max_connections(), max_connections: default_server_max_connections(),
accept_permit_timeout_ms: default_accept_permit_timeout_ms(),
} }
} }
} }

View File

@ -205,6 +205,7 @@ pub(crate) fn format_uptime(total_secs: u64) -> String {
format!("{} / {} seconds", parts.join(", "), total_secs) format!("{} / {} seconds", parts.join(", "), total_secs)
} }
#[allow(dead_code)]
pub(crate) async fn wait_until_admission_open(admission_rx: &mut watch::Receiver<bool>) -> bool { pub(crate) async fn wait_until_admission_open(admission_rx: &mut watch::Receiver<bool>) -> bool {
loop { loop {
if *admission_rx.borrow() { if *admission_rx.borrow() {

View File

@ -24,7 +24,7 @@ use crate::transport::{
ListenOptions, UpstreamManager, create_listener, find_listener_processes, ListenOptions, UpstreamManager, create_listener, find_listener_processes,
}; };
use super::helpers::{is_expected_handshake_eof, print_proxy_links, wait_until_admission_open}; use super::helpers::{is_expected_handshake_eof, print_proxy_links};
pub(crate) struct BoundListeners { pub(crate) struct BoundListeners {
pub(crate) listeners: Vec<(TcpListener, bool)>, pub(crate) listeners: Vec<(TcpListener, bool)>,
@ -195,7 +195,7 @@ pub(crate) async fn bind_listeners(
has_unix_listener = true; has_unix_listener = true;
let mut config_rx_unix: watch::Receiver<Arc<ProxyConfig>> = config_rx.clone(); let mut config_rx_unix: watch::Receiver<Arc<ProxyConfig>> = config_rx.clone();
let mut admission_rx_unix = admission_rx.clone(); let admission_rx_unix = admission_rx.clone();
let stats = stats.clone(); let stats = stats.clone();
let upstream_manager = upstream_manager.clone(); let upstream_manager = upstream_manager.clone();
let replay_checker = replay_checker.clone(); let replay_checker = replay_checker.clone();
@ -212,17 +212,44 @@ pub(crate) async fn bind_listeners(
let unix_conn_counter = Arc::new(std::sync::atomic::AtomicU64::new(1)); let unix_conn_counter = Arc::new(std::sync::atomic::AtomicU64::new(1));
loop { loop {
if !wait_until_admission_open(&mut admission_rx_unix).await {
warn!("Conditional-admission gate channel closed for unix listener");
break;
}
match unix_listener.accept().await { match unix_listener.accept().await {
Ok((stream, _)) => { Ok((stream, _)) => {
let permit = match max_connections_unix.clone().acquire_owned().await { if !*admission_rx_unix.borrow() {
Ok(permit) => permit, drop(stream);
Err(_) => { continue;
error!("Connection limiter is closed"); }
break; let accept_permit_timeout_ms = config_rx_unix
.borrow()
.server
.accept_permit_timeout_ms;
let permit = if accept_permit_timeout_ms == 0 {
match max_connections_unix.clone().acquire_owned().await {
Ok(permit) => permit,
Err(_) => {
error!("Connection limiter is closed");
break;
}
}
} else {
match tokio::time::timeout(
Duration::from_millis(accept_permit_timeout_ms),
max_connections_unix.clone().acquire_owned(),
)
.await
{
Ok(Ok(permit)) => permit,
Ok(Err(_)) => {
error!("Connection limiter is closed");
break;
}
Err(_) => {
debug!(
timeout_ms = accept_permit_timeout_ms,
"Dropping accepted unix connection: permit wait timeout"
);
drop(stream);
continue;
}
} }
}; };
let conn_id = let conn_id =
@ -312,7 +339,7 @@ pub(crate) fn spawn_tcp_accept_loops(
) { ) {
for (listener, listener_proxy_protocol) in listeners { for (listener, listener_proxy_protocol) in listeners {
let mut config_rx: watch::Receiver<Arc<ProxyConfig>> = config_rx.clone(); let mut config_rx: watch::Receiver<Arc<ProxyConfig>> = config_rx.clone();
let mut admission_rx_tcp = admission_rx.clone(); let admission_rx_tcp = admission_rx.clone();
let stats = stats.clone(); let stats = stats.clone();
let upstream_manager = upstream_manager.clone(); let upstream_manager = upstream_manager.clone();
let replay_checker = replay_checker.clone(); let replay_checker = replay_checker.clone();
@ -327,17 +354,46 @@ pub(crate) fn spawn_tcp_accept_loops(
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
if !wait_until_admission_open(&mut admission_rx_tcp).await {
warn!("Conditional-admission gate channel closed for tcp listener");
break;
}
match listener.accept().await { match listener.accept().await {
Ok((stream, peer_addr)) => { Ok((stream, peer_addr)) => {
let permit = match max_connections_tcp.clone().acquire_owned().await { if !*admission_rx_tcp.borrow() {
Ok(permit) => permit, debug!(peer = %peer_addr, "Admission gate closed, dropping connection");
Err(_) => { drop(stream);
error!("Connection limiter is closed"); continue;
break; }
let accept_permit_timeout_ms = config_rx
.borrow()
.server
.accept_permit_timeout_ms;
let permit = if accept_permit_timeout_ms == 0 {
match max_connections_tcp.clone().acquire_owned().await {
Ok(permit) => permit,
Err(_) => {
error!("Connection limiter is closed");
break;
}
}
} else {
match tokio::time::timeout(
Duration::from_millis(accept_permit_timeout_ms),
max_connections_tcp.clone().acquire_owned(),
)
.await
{
Ok(Ok(permit)) => permit,
Ok(Err(_)) => {
error!("Connection limiter is closed");
break;
}
Err(_) => {
debug!(
peer = %peer_addr,
timeout_ms = accept_permit_timeout_ms,
"Dropping accepted connection: permit wait timeout"
);
drop(stream);
continue;
}
} }
}; };
let config = config_rx.borrow_and_update().clone(); let config = config_rx.borrow_and_update().clone();

View File

@ -267,6 +267,8 @@ pub(crate) async fn initialize_me_pool(
config.general.me_warn_rate_limit_ms, config.general.me_warn_rate_limit_ms,
config.general.me_route_no_writer_mode, config.general.me_route_no_writer_mode,
config.general.me_route_no_writer_wait_ms, config.general.me_route_no_writer_wait_ms,
config.general.me_route_hybrid_max_wait_ms,
config.general.me_route_blocking_send_timeout_ms,
config.general.me_route_inline_recovery_attempts, config.general.me_route_inline_recovery_attempts,
config.general.me_route_inline_recovery_wait_ms, config.general.me_route_inline_recovery_wait_ms,
); );

View File

@ -222,6 +222,7 @@ fn should_yield_c2me_sender(sent_since_yield: usize, has_backlog: bool) -> bool
async fn enqueue_c2me_command( async fn enqueue_c2me_command(
tx: &mpsc::Sender<C2MeCommand>, tx: &mpsc::Sender<C2MeCommand>,
cmd: C2MeCommand, cmd: C2MeCommand,
send_timeout: Duration,
) -> std::result::Result<(), mpsc::error::SendError<C2MeCommand>> { ) -> std::result::Result<(), mpsc::error::SendError<C2MeCommand>> {
match tx.try_send(cmd) { match tx.try_send(cmd) {
Ok(()) => Ok(()), Ok(()) => Ok(()),
@ -231,7 +232,17 @@ async fn enqueue_c2me_command(
if tx.capacity() <= C2ME_SOFT_PRESSURE_MIN_FREE_SLOTS { if tx.capacity() <= C2ME_SOFT_PRESSURE_MIN_FREE_SLOTS {
tokio::task::yield_now().await; tokio::task::yield_now().await;
} }
tx.send(cmd).await if send_timeout.is_zero() {
return tx.send(cmd).await;
}
match tokio::time::timeout(send_timeout, tx.reserve()).await {
Ok(Ok(permit)) => {
permit.send(cmd);
Ok(())
}
Ok(Err(_)) => Err(mpsc::error::SendError(cmd)),
Err(_) => Err(mpsc::error::SendError(cmd)),
}
} }
} }
} }
@ -355,6 +366,7 @@ where
.general .general
.me_c2me_channel_capacity .me_c2me_channel_capacity
.max(C2ME_CHANNEL_CAPACITY_FALLBACK); .max(C2ME_CHANNEL_CAPACITY_FALLBACK);
let c2me_send_timeout = Duration::from_millis(config.general.me_c2me_send_timeout_ms);
let (c2me_tx, mut c2me_rx) = mpsc::channel::<C2MeCommand>(c2me_channel_capacity); let (c2me_tx, mut c2me_rx) = mpsc::channel::<C2MeCommand>(c2me_channel_capacity);
let me_pool_c2me = me_pool.clone(); let me_pool_c2me = me_pool.clone();
let effective_tag = effective_tag; let effective_tag = effective_tag;
@ -363,15 +375,42 @@ where
while let Some(cmd) = c2me_rx.recv().await { while let Some(cmd) = c2me_rx.recv().await {
match cmd { match cmd {
C2MeCommand::Data { payload, flags } => { C2MeCommand::Data { payload, flags } => {
me_pool_c2me.send_proxy_req( if c2me_send_timeout.is_zero() {
conn_id, me_pool_c2me
success.dc_idx, .send_proxy_req(
peer, conn_id,
translated_local_addr, success.dc_idx,
payload.as_ref(), peer,
flags, translated_local_addr,
effective_tag.as_deref(), payload.as_ref(),
).await?; flags,
effective_tag.as_deref(),
)
.await?;
} else {
match tokio::time::timeout(
c2me_send_timeout,
me_pool_c2me.send_proxy_req(
conn_id,
success.dc_idx,
peer,
translated_local_addr,
payload.as_ref(),
flags,
effective_tag.as_deref(),
),
)
.await
{
Ok(send_result) => send_result?,
Err(_) => {
return Err(ProxyError::Proxy(format!(
"ME send timeout after {}ms",
c2me_send_timeout.as_millis()
)));
}
}
}
sent_since_yield = sent_since_yield.saturating_add(1); sent_since_yield = sent_since_yield.saturating_add(1);
if should_yield_c2me_sender(sent_since_yield, !c2me_rx.is_empty()) { if should_yield_c2me_sender(sent_since_yield, !c2me_rx.is_empty()) {
sent_since_yield = 0; sent_since_yield = 0;
@ -555,7 +594,7 @@ where
loop { loop {
if session_lease.is_stale() { if session_lease.is_stale() {
stats.increment_reconnect_stale_close_total(); stats.increment_reconnect_stale_close_total();
let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close).await; let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close, c2me_send_timeout).await;
main_result = Err(ProxyError::Proxy("Session evicted by reconnect".to_string())); main_result = Err(ProxyError::Proxy("Session evicted by reconnect".to_string()));
break; break;
} }
@ -573,7 +612,7 @@ where
"Cutover affected middle session, closing client connection" "Cutover affected middle session, closing client connection"
); );
tokio::time::sleep(delay).await; tokio::time::sleep(delay).await;
let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close).await; let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close, c2me_send_timeout).await;
main_result = Err(ProxyError::Proxy(ROUTE_SWITCH_ERROR_MSG.to_string())); main_result = Err(ProxyError::Proxy(ROUTE_SWITCH_ERROR_MSG.to_string()));
break; break;
} }
@ -607,9 +646,13 @@ where
flags |= RPC_FLAG_NOT_ENCRYPTED; flags |= RPC_FLAG_NOT_ENCRYPTED;
} }
// Keep client read loop lightweight: route heavy ME send path via a dedicated task. // Keep client read loop lightweight: route heavy ME send path via a dedicated task.
if enqueue_c2me_command(&c2me_tx, C2MeCommand::Data { payload, flags }) if enqueue_c2me_command(
.await &c2me_tx,
.is_err() C2MeCommand::Data { payload, flags },
c2me_send_timeout,
)
.await
.is_err()
{ {
main_result = Err(ProxyError::Proxy("ME sender channel closed".into())); main_result = Err(ProxyError::Proxy("ME sender channel closed".into()));
break; break;
@ -618,7 +661,12 @@ where
Ok(None) => { Ok(None) => {
debug!(conn_id, "Client EOF"); debug!(conn_id, "Client EOF");
client_closed = true; client_closed = true;
let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close).await; let _ = enqueue_c2me_command(
&c2me_tx,
C2MeCommand::Close,
c2me_send_timeout,
)
.await;
break; break;
} }
Err(e) => { Err(e) => {
@ -993,6 +1041,7 @@ mod tests {
payload: Bytes::from_static(&[1, 2, 3]), payload: Bytes::from_static(&[1, 2, 3]),
flags: 0, flags: 0,
}, },
TokioDuration::from_millis(50),
) )
.await .await
.unwrap(); .unwrap();
@ -1028,6 +1077,7 @@ mod tests {
payload: Bytes::from_static(&[7, 7]), payload: Bytes::from_static(&[7, 7]),
flags: 7, flags: 7,
}, },
TokioDuration::from_millis(100),
) )
.await .await
.unwrap(); .unwrap();

View File

@ -1574,6 +1574,8 @@ mod tests {
general.me_warn_rate_limit_ms, general.me_warn_rate_limit_ms,
MeRouteNoWriterMode::default(), MeRouteNoWriterMode::default(),
general.me_route_no_writer_wait_ms, general.me_route_no_writer_wait_ms,
general.me_route_hybrid_max_wait_ms,
general.me_route_blocking_send_timeout_ms,
general.me_route_inline_recovery_attempts, general.me_route_inline_recovery_attempts,
general.me_route_inline_recovery_wait_ms, general.me_route_inline_recovery_wait_ms,
) )

View File

@ -111,6 +111,8 @@ async fn make_pool(
general.me_warn_rate_limit_ms, general.me_warn_rate_limit_ms,
MeRouteNoWriterMode::default(), MeRouteNoWriterMode::default(),
general.me_route_no_writer_wait_ms, general.me_route_no_writer_wait_ms,
general.me_route_hybrid_max_wait_ms,
general.me_route_blocking_send_timeout_ms,
general.me_route_inline_recovery_attempts, general.me_route_inline_recovery_attempts,
general.me_route_inline_recovery_wait_ms, general.me_route_inline_recovery_wait_ms,
); );

View File

@ -110,6 +110,8 @@ async fn make_pool(
general.me_warn_rate_limit_ms, general.me_warn_rate_limit_ms,
MeRouteNoWriterMode::default(), MeRouteNoWriterMode::default(),
general.me_route_no_writer_wait_ms, general.me_route_no_writer_wait_ms,
general.me_route_hybrid_max_wait_ms,
general.me_route_blocking_send_timeout_ms,
general.me_route_inline_recovery_attempts, general.me_route_inline_recovery_attempts,
general.me_route_inline_recovery_wait_ms, general.me_route_inline_recovery_wait_ms,
); );

View File

@ -103,6 +103,8 @@ async fn make_pool(me_pool_drain_threshold: u64) -> Arc<MePool> {
general.me_warn_rate_limit_ms, general.me_warn_rate_limit_ms,
MeRouteNoWriterMode::default(), MeRouteNoWriterMode::default(),
general.me_route_no_writer_wait_ms, general.me_route_no_writer_wait_ms,
general.me_route_hybrid_max_wait_ms,
general.me_route_blocking_send_timeout_ms,
general.me_route_inline_recovery_attempts, general.me_route_inline_recovery_attempts,
general.me_route_inline_recovery_wait_ms, general.me_route_inline_recovery_wait_ms,
) )

View File

@ -193,6 +193,8 @@ pub struct MePool {
pub(super) me_reader_route_data_wait_ms: Arc<AtomicU64>, pub(super) me_reader_route_data_wait_ms: Arc<AtomicU64>,
pub(super) me_route_no_writer_mode: AtomicU8, pub(super) me_route_no_writer_mode: AtomicU8,
pub(super) me_route_no_writer_wait: Duration, pub(super) me_route_no_writer_wait: Duration,
pub(super) me_route_hybrid_max_wait: Duration,
pub(super) me_route_blocking_send_timeout: Duration,
pub(super) me_route_inline_recovery_attempts: u32, pub(super) me_route_inline_recovery_attempts: u32,
pub(super) me_route_inline_recovery_wait: Duration, pub(super) me_route_inline_recovery_wait: Duration,
pub(super) me_health_interval_ms_unhealthy: AtomicU64, pub(super) me_health_interval_ms_unhealthy: AtomicU64,
@ -307,6 +309,8 @@ impl MePool {
me_warn_rate_limit_ms: u64, me_warn_rate_limit_ms: u64,
me_route_no_writer_mode: MeRouteNoWriterMode, me_route_no_writer_mode: MeRouteNoWriterMode,
me_route_no_writer_wait_ms: u64, me_route_no_writer_wait_ms: u64,
me_route_hybrid_max_wait_ms: u64,
me_route_blocking_send_timeout_ms: u64,
me_route_inline_recovery_attempts: u32, me_route_inline_recovery_attempts: u32,
me_route_inline_recovery_wait_ms: u64, me_route_inline_recovery_wait_ms: u64,
) -> Arc<Self> { ) -> Arc<Self> {
@ -490,6 +494,10 @@ impl MePool {
me_reader_route_data_wait_ms: Arc::new(AtomicU64::new(me_reader_route_data_wait_ms)), me_reader_route_data_wait_ms: Arc::new(AtomicU64::new(me_reader_route_data_wait_ms)),
me_route_no_writer_mode: AtomicU8::new(me_route_no_writer_mode.as_u8()), me_route_no_writer_mode: AtomicU8::new(me_route_no_writer_mode.as_u8()),
me_route_no_writer_wait: Duration::from_millis(me_route_no_writer_wait_ms), me_route_no_writer_wait: Duration::from_millis(me_route_no_writer_wait_ms),
me_route_hybrid_max_wait: Duration::from_millis(me_route_hybrid_max_wait_ms),
me_route_blocking_send_timeout: Duration::from_millis(
me_route_blocking_send_timeout_ms,
),
me_route_inline_recovery_attempts, me_route_inline_recovery_attempts,
me_route_inline_recovery_wait: Duration::from_millis(me_route_inline_recovery_wait_ms), me_route_inline_recovery_wait: Duration::from_millis(me_route_inline_recovery_wait_ms),
me_health_interval_ms_unhealthy: AtomicU64::new(me_health_interval_ms_unhealthy.max(1)), me_health_interval_ms_unhealthy: AtomicU64::new(me_health_interval_ms_unhealthy.max(1)),

View File

@ -312,41 +312,28 @@ impl MePool {
let mut p = Vec::with_capacity(12); let mut p = Vec::with_capacity(12);
p.extend_from_slice(&RPC_PING_U32.to_le_bytes()); p.extend_from_slice(&RPC_PING_U32.to_le_bytes());
p.extend_from_slice(&sent_id.to_le_bytes()); p.extend_from_slice(&sent_id.to_le_bytes());
{ let now_epoch_ms = std::time::SystemTime::now()
let mut tracker = ping_tracker_ping.lock().await; .duration_since(std::time::UNIX_EPOCH)
let now_epoch_ms = std::time::SystemTime::now() .unwrap_or_default()
.duration_since(std::time::UNIX_EPOCH) .as_millis() as u64;
.unwrap_or_default() let mut run_cleanup = false;
.as_millis() as u64; if let Some(pool) = pool_ping.upgrade() {
let mut run_cleanup = false; let last_cleanup_ms = pool
if let Some(pool) = pool_ping.upgrade() { .ping_tracker_last_cleanup_epoch_ms
let last_cleanup_ms = pool .load(Ordering::Relaxed);
if now_epoch_ms.saturating_sub(last_cleanup_ms) >= 30_000
&& pool
.ping_tracker_last_cleanup_epoch_ms .ping_tracker_last_cleanup_epoch_ms
.load(Ordering::Relaxed); .compare_exchange(
if now_epoch_ms.saturating_sub(last_cleanup_ms) >= 30_000 last_cleanup_ms,
&& pool now_epoch_ms,
.ping_tracker_last_cleanup_epoch_ms Ordering::AcqRel,
.compare_exchange( Ordering::Relaxed,
last_cleanup_ms, )
now_epoch_ms, .is_ok()
Ordering::AcqRel, {
Ordering::Relaxed, run_cleanup = true;
)
.is_ok()
{
run_cleanup = true;
}
} }
if run_cleanup {
let before = tracker.len();
tracker.retain(|_, (ts, _)| ts.elapsed() < Duration::from_secs(120));
let expired = before.saturating_sub(tracker.len());
if expired > 0 {
stats_ping.increment_me_keepalive_timeout_by(expired as u64);
}
}
tracker.insert(sent_id, (std::time::Instant::now(), writer_id));
} }
ping_id = ping_id.wrapping_add(1); ping_id = ping_id.wrapping_add(1);
stats_ping.increment_me_keepalive_sent(); stats_ping.increment_me_keepalive_sent();
@ -367,6 +354,16 @@ impl MePool {
} }
break; break;
} }
let mut tracker = ping_tracker_ping.lock().await;
if run_cleanup {
let before = tracker.len();
tracker.retain(|_, (ts, _)| ts.elapsed() < Duration::from_secs(120));
let expired = before.saturating_sub(tracker.len());
if expired > 0 {
stats_ping.increment_me_keepalive_timeout_by(expired as u64);
}
}
tracker.insert(sent_id, (std::time::Instant::now(), writer_id));
} }
}); });

View File

@ -6,6 +6,7 @@ use std::sync::atomic::Ordering;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use bytes::Bytes; use bytes::Bytes;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError; use tokio::sync::mpsc::error::TrySendError;
use tracing::{debug, warn}; use tracing::{debug, warn};
@ -29,6 +30,29 @@ const PICK_PENALTY_DRAINING: u64 = 600;
const PICK_PENALTY_STALE: u64 = 300; const PICK_PENALTY_STALE: u64 = 300;
const PICK_PENALTY_DEGRADED: u64 = 250; const PICK_PENALTY_DEGRADED: u64 = 250;
enum TimedSendError<T> {
Closed(T),
Timeout(T),
}
async fn send_writer_command_with_timeout(
tx: &mpsc::Sender<WriterCommand>,
cmd: WriterCommand,
timeout: Duration,
) -> std::result::Result<(), TimedSendError<WriterCommand>> {
if timeout.is_zero() {
return tx.send(cmd).await.map_err(|err| TimedSendError::Closed(err.0));
}
match tokio::time::timeout(timeout, tx.reserve()).await {
Ok(Ok(permit)) => {
permit.send(cmd);
Ok(())
}
Ok(Err(_)) => Err(TimedSendError::Closed(cmd)),
Err(_) => Err(TimedSendError::Timeout(cmd)),
}
}
impl MePool { impl MePool {
/// Send RPC_PROXY_REQ. `tag_override`: per-user ad_tag (from access.user_ad_tags); if None, uses pool default. /// Send RPC_PROXY_REQ. `tag_override`: per-user ad_tag (from access.user_ad_tags); if None, uses pool default.
pub async fn send_proxy_req( pub async fn send_proxy_req(
@ -78,8 +102,18 @@ impl MePool {
let mut hybrid_last_recovery_at: Option<Instant> = None; let mut hybrid_last_recovery_at: Option<Instant> = None;
let hybrid_wait_step = self.me_route_no_writer_wait.max(Duration::from_millis(50)); let hybrid_wait_step = self.me_route_no_writer_wait.max(Duration::from_millis(50));
let mut hybrid_wait_current = hybrid_wait_step; let mut hybrid_wait_current = hybrid_wait_step;
let hybrid_deadline = Instant::now() + self.me_route_hybrid_max_wait;
loop { loop {
if matches!(no_writer_mode, MeRouteNoWriterMode::HybridAsyncPersistent)
&& Instant::now() >= hybrid_deadline
{
self.stats.increment_me_no_writer_failfast_total();
return Err(ProxyError::Proxy(
"No ME writer available in hybrid wait window".into(),
));
}
let mut skip_writer_id: Option<u64> = None;
let current_meta = self let current_meta = self
.registry .registry
.get_meta(conn_id) .get_meta(conn_id)
@ -90,12 +124,30 @@ impl MePool {
match current.tx.try_send(WriterCommand::Data(current_payload.clone())) { match current.tx.try_send(WriterCommand::Data(current_payload.clone())) {
Ok(()) => return Ok(()), Ok(()) => return Ok(()),
Err(TrySendError::Full(cmd)) => { Err(TrySendError::Full(cmd)) => {
if current.tx.send(cmd).await.is_ok() { match send_writer_command_with_timeout(
return Ok(()); &current.tx,
cmd,
self.me_route_blocking_send_timeout,
)
.await
{
Ok(()) => return Ok(()),
Err(TimedSendError::Closed(_)) => {
warn!(writer_id = current.writer_id, "ME writer channel closed");
self.remove_writer_and_close_clients(current.writer_id).await;
continue;
}
Err(TimedSendError::Timeout(_)) => {
debug!(
conn_id,
writer_id = current.writer_id,
timeout_ms = self.me_route_blocking_send_timeout.as_millis()
as u64,
"ME writer send timed out for bound writer, trying reroute"
);
skip_writer_id = Some(current.writer_id);
}
} }
warn!(writer_id = current.writer_id, "ME writer channel closed");
self.remove_writer_and_close_clients(current.writer_id).await;
continue;
} }
Err(TrySendError::Closed(_)) => { Err(TrySendError::Closed(_)) => {
warn!(writer_id = current.writer_id, "ME writer channel closed"); warn!(writer_id = current.writer_id, "ME writer channel closed");
@ -200,6 +252,9 @@ impl MePool {
.candidate_indices_for_dc(&writers_snapshot, routed_dc, true) .candidate_indices_for_dc(&writers_snapshot, routed_dc, true)
.await; .await;
} }
if let Some(skip_writer_id) = skip_writer_id {
candidate_indices.retain(|idx| writers_snapshot[*idx].id != skip_writer_id);
}
if candidate_indices.is_empty() { if candidate_indices.is_empty() {
let pick_mode = self.writer_pick_mode(); let pick_mode = self.writer_pick_mode();
match no_writer_mode { match no_writer_mode {
@ -422,7 +477,13 @@ impl MePool {
self.stats.increment_me_writer_pick_blocking_fallback_total(); self.stats.increment_me_writer_pick_blocking_fallback_total();
let effective_our_addr = SocketAddr::new(w.source_ip, our_addr.port()); let effective_our_addr = SocketAddr::new(w.source_ip, our_addr.port());
let (payload, meta) = build_routed_payload(effective_our_addr); let (payload, meta) = build_routed_payload(effective_our_addr);
match w.tx.send(WriterCommand::Data(payload.clone())).await { match send_writer_command_with_timeout(
&w.tx,
WriterCommand::Data(payload.clone()),
self.me_route_blocking_send_timeout,
)
.await
{
Ok(()) => { Ok(()) => {
self.stats self.stats
.increment_me_writer_pick_success_fallback_total(pick_mode); .increment_me_writer_pick_success_fallback_total(pick_mode);
@ -439,11 +500,20 @@ impl MePool {
} }
return Ok(()); return Ok(());
} }
Err(_) => { Err(TimedSendError::Closed(_)) => {
self.stats.increment_me_writer_pick_closed_total(pick_mode); self.stats.increment_me_writer_pick_closed_total(pick_mode);
warn!(writer_id = w.id, "ME writer channel closed (blocking)"); warn!(writer_id = w.id, "ME writer channel closed (blocking)");
self.remove_writer_and_close_clients(w.id).await; self.remove_writer_and_close_clients(w.id).await;
} }
Err(TimedSendError::Timeout(_)) => {
self.stats.increment_me_writer_pick_full_total(pick_mode);
debug!(
conn_id,
writer_id = w.id,
timeout_ms = self.me_route_blocking_send_timeout.as_millis() as u64,
"ME writer blocking fallback send timed out"
);
}
} }
} }
} }