mirror of https://github.com/telemt/telemt.git
Runtime guardrails: merge pull request #484 from telemt/flow-runtime
Runtime guardrails
This commit is contained in:
commit
53f691ad4f
|
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "telemt"
|
name = "telemt"
|
||||||
version = "3.3.22"
|
version = "3.3.23"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
|
|
||||||
|
|
@ -36,12 +36,16 @@ const DEFAULT_ME_HEALTH_INTERVAL_MS_UNHEALTHY: u64 = 1000;
|
||||||
const DEFAULT_ME_HEALTH_INTERVAL_MS_HEALTHY: u64 = 3000;
|
const DEFAULT_ME_HEALTH_INTERVAL_MS_HEALTHY: u64 = 3000;
|
||||||
const DEFAULT_ME_ADMISSION_POLL_MS: u64 = 1000;
|
const DEFAULT_ME_ADMISSION_POLL_MS: u64 = 1000;
|
||||||
const DEFAULT_ME_WARN_RATE_LIMIT_MS: u64 = 5000;
|
const DEFAULT_ME_WARN_RATE_LIMIT_MS: u64 = 5000;
|
||||||
|
const DEFAULT_ME_ROUTE_HYBRID_MAX_WAIT_MS: u64 = 3000;
|
||||||
|
const DEFAULT_ME_ROUTE_BLOCKING_SEND_TIMEOUT_MS: u64 = 250;
|
||||||
|
const DEFAULT_ME_C2ME_SEND_TIMEOUT_MS: u64 = 4000;
|
||||||
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_ENABLED: bool = true;
|
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_ENABLED: bool = true;
|
||||||
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_GRACE_SECS: u64 = 30;
|
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_GRACE_SECS: u64 = 30;
|
||||||
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_PER_WRITER: u8 = 1;
|
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_PER_WRITER: u8 = 1;
|
||||||
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_BUDGET_PER_CORE: u16 = 8;
|
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_BUDGET_PER_CORE: u16 = 8;
|
||||||
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_COOLDOWN_MS: u64 = 5000;
|
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_COOLDOWN_MS: u64 = 5000;
|
||||||
const DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS: u64 = 30;
|
const DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS: u64 = 30;
|
||||||
|
const DEFAULT_ACCEPT_PERMIT_TIMEOUT_MS: u64 = 250;
|
||||||
const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 2;
|
const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 2;
|
||||||
const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 5;
|
const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 5;
|
||||||
const DEFAULT_UPSTREAM_CONNECT_BUDGET_MS: u64 = 3000;
|
const DEFAULT_UPSTREAM_CONNECT_BUDGET_MS: u64 = 3000;
|
||||||
|
|
@ -156,6 +160,10 @@ pub(crate) fn default_server_max_connections() -> u32 {
|
||||||
10_000
|
10_000
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_accept_permit_timeout_ms() -> u64 {
|
||||||
|
DEFAULT_ACCEPT_PERMIT_TIMEOUT_MS
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn default_prefer_4() -> u8 {
|
pub(crate) fn default_prefer_4() -> u8 {
|
||||||
4
|
4
|
||||||
}
|
}
|
||||||
|
|
@ -380,6 +388,18 @@ pub(crate) fn default_me_warn_rate_limit_ms() -> u64 {
|
||||||
DEFAULT_ME_WARN_RATE_LIMIT_MS
|
DEFAULT_ME_WARN_RATE_LIMIT_MS
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_me_route_hybrid_max_wait_ms() -> u64 {
|
||||||
|
DEFAULT_ME_ROUTE_HYBRID_MAX_WAIT_MS
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_me_route_blocking_send_timeout_ms() -> u64 {
|
||||||
|
DEFAULT_ME_ROUTE_BLOCKING_SEND_TIMEOUT_MS
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_me_c2me_send_timeout_ms() -> u64 {
|
||||||
|
DEFAULT_ME_C2ME_SEND_TIMEOUT_MS
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn default_upstream_connect_retry_attempts() -> u32 {
|
pub(crate) fn default_upstream_connect_retry_attempts() -> u32 {
|
||||||
DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS
|
DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -612,6 +612,8 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b
|
||||||
|| old.server.listen_tcp != new.server.listen_tcp
|
|| old.server.listen_tcp != new.server.listen_tcp
|
||||||
|| old.server.listen_unix_sock != new.server.listen_unix_sock
|
|| old.server.listen_unix_sock != new.server.listen_unix_sock
|
||||||
|| old.server.listen_unix_sock_perm != new.server.listen_unix_sock_perm
|
|| old.server.listen_unix_sock_perm != new.server.listen_unix_sock_perm
|
||||||
|
|| old.server.max_connections != new.server.max_connections
|
||||||
|
|| old.server.accept_permit_timeout_ms != new.server.accept_permit_timeout_ms
|
||||||
{
|
{
|
||||||
warned = true;
|
warned = true;
|
||||||
warn!("config reload: server listener settings changed; restart required");
|
warn!("config reload: server listener settings changed; restart required");
|
||||||
|
|
@ -671,6 +673,9 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b
|
||||||
}
|
}
|
||||||
if old.general.me_route_no_writer_mode != new.general.me_route_no_writer_mode
|
if old.general.me_route_no_writer_mode != new.general.me_route_no_writer_mode
|
||||||
|| old.general.me_route_no_writer_wait_ms != new.general.me_route_no_writer_wait_ms
|
|| old.general.me_route_no_writer_wait_ms != new.general.me_route_no_writer_wait_ms
|
||||||
|
|| old.general.me_route_hybrid_max_wait_ms != new.general.me_route_hybrid_max_wait_ms
|
||||||
|
|| old.general.me_route_blocking_send_timeout_ms
|
||||||
|
!= new.general.me_route_blocking_send_timeout_ms
|
||||||
|| old.general.me_route_inline_recovery_attempts
|
|| old.general.me_route_inline_recovery_attempts
|
||||||
!= new.general.me_route_inline_recovery_attempts
|
!= new.general.me_route_inline_recovery_attempts
|
||||||
|| old.general.me_route_inline_recovery_wait_ms
|
|| old.general.me_route_inline_recovery_wait_ms
|
||||||
|
|
@ -679,6 +684,10 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b
|
||||||
warned = true;
|
warned = true;
|
||||||
warn!("config reload: general.me_route_no_writer_* changed; restart required");
|
warn!("config reload: general.me_route_no_writer_* changed; restart required");
|
||||||
}
|
}
|
||||||
|
if old.general.me_c2me_send_timeout_ms != new.general.me_c2me_send_timeout_ms {
|
||||||
|
warned = true;
|
||||||
|
warn!("config reload: general.me_c2me_send_timeout_ms changed; restart required");
|
||||||
|
}
|
||||||
if old.general.unknown_dc_log_path != new.general.unknown_dc_log_path
|
if old.general.unknown_dc_log_path != new.general.unknown_dc_log_path
|
||||||
|| old.general.unknown_dc_file_log_enabled != new.general.unknown_dc_file_log_enabled
|
|| old.general.unknown_dc_file_log_enabled != new.general.unknown_dc_file_log_enabled
|
||||||
{
|
{
|
||||||
|
|
|
||||||
|
|
@ -346,6 +346,12 @@ impl ProxyConfig {
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if config.general.me_c2me_send_timeout_ms > 60_000 {
|
||||||
|
return Err(ProxyError::Config(
|
||||||
|
"general.me_c2me_send_timeout_ms must be within [0, 60000]".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
if config.general.me_reader_route_data_wait_ms > 20 {
|
if config.general.me_reader_route_data_wait_ms > 20 {
|
||||||
return Err(ProxyError::Config(
|
return Err(ProxyError::Config(
|
||||||
"general.me_reader_route_data_wait_ms must be within [0, 20]".to_string(),
|
"general.me_reader_route_data_wait_ms must be within [0, 20]".to_string(),
|
||||||
|
|
@ -627,6 +633,18 @@ impl ProxyConfig {
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !(50..=60_000).contains(&config.general.me_route_hybrid_max_wait_ms) {
|
||||||
|
return Err(ProxyError::Config(
|
||||||
|
"general.me_route_hybrid_max_wait_ms must be within [50, 60000]".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.general.me_route_blocking_send_timeout_ms > 5000 {
|
||||||
|
return Err(ProxyError::Config(
|
||||||
|
"general.me_route_blocking_send_timeout_ms must be within [0, 5000]".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
if !(2..=4).contains(&config.general.me_writer_pick_sample_size) {
|
if !(2..=4).contains(&config.general.me_writer_pick_sample_size) {
|
||||||
return Err(ProxyError::Config(
|
return Err(ProxyError::Config(
|
||||||
"general.me_writer_pick_sample_size must be within [2, 4]".to_string(),
|
"general.me_writer_pick_sample_size must be within [2, 4]".to_string(),
|
||||||
|
|
@ -687,6 +705,12 @@ impl ProxyConfig {
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if config.server.accept_permit_timeout_ms > 60_000 {
|
||||||
|
return Err(ProxyError::Config(
|
||||||
|
"server.accept_permit_timeout_ms must be within [0, 60000]".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
if config.general.effective_me_pool_force_close_secs() > 0
|
if config.general.effective_me_pool_force_close_secs() > 0
|
||||||
&& config.general.effective_me_pool_force_close_secs()
|
&& config.general.effective_me_pool_force_close_secs()
|
||||||
< config.general.me_pool_drain_ttl_secs
|
< config.general.me_pool_drain_ttl_secs
|
||||||
|
|
|
||||||
|
|
@ -462,6 +462,11 @@ pub struct GeneralConfig {
|
||||||
#[serde(default = "default_me_c2me_channel_capacity")]
|
#[serde(default = "default_me_c2me_channel_capacity")]
|
||||||
pub me_c2me_channel_capacity: usize,
|
pub me_c2me_channel_capacity: usize,
|
||||||
|
|
||||||
|
/// Maximum wait in milliseconds for enqueueing C2ME commands when the queue is full.
|
||||||
|
/// `0` keeps legacy unbounded wait behavior.
|
||||||
|
#[serde(default = "default_me_c2me_send_timeout_ms")]
|
||||||
|
pub me_c2me_send_timeout_ms: u64,
|
||||||
|
|
||||||
/// Bounded wait in milliseconds for routing ME DATA to per-connection queue.
|
/// Bounded wait in milliseconds for routing ME DATA to per-connection queue.
|
||||||
/// `0` keeps legacy no-wait behavior.
|
/// `0` keeps legacy no-wait behavior.
|
||||||
#[serde(default = "default_me_reader_route_data_wait_ms")]
|
#[serde(default = "default_me_reader_route_data_wait_ms")]
|
||||||
|
|
@ -716,6 +721,15 @@ pub struct GeneralConfig {
|
||||||
#[serde(default = "default_me_route_no_writer_wait_ms")]
|
#[serde(default = "default_me_route_no_writer_wait_ms")]
|
||||||
pub me_route_no_writer_wait_ms: u64,
|
pub me_route_no_writer_wait_ms: u64,
|
||||||
|
|
||||||
|
/// Maximum cumulative wait in milliseconds for hybrid no-writer mode before failfast.
|
||||||
|
#[serde(default = "default_me_route_hybrid_max_wait_ms")]
|
||||||
|
pub me_route_hybrid_max_wait_ms: u64,
|
||||||
|
|
||||||
|
/// Maximum wait in milliseconds for blocking ME writer channel send fallback.
|
||||||
|
/// `0` keeps legacy unbounded wait behavior.
|
||||||
|
#[serde(default = "default_me_route_blocking_send_timeout_ms")]
|
||||||
|
pub me_route_blocking_send_timeout_ms: u64,
|
||||||
|
|
||||||
/// Number of inline recovery attempts in legacy mode.
|
/// Number of inline recovery attempts in legacy mode.
|
||||||
#[serde(default = "default_me_route_inline_recovery_attempts")]
|
#[serde(default = "default_me_route_inline_recovery_attempts")]
|
||||||
pub me_route_inline_recovery_attempts: u32,
|
pub me_route_inline_recovery_attempts: u32,
|
||||||
|
|
@ -921,6 +935,7 @@ impl Default for GeneralConfig {
|
||||||
me_writer_cmd_channel_capacity: default_me_writer_cmd_channel_capacity(),
|
me_writer_cmd_channel_capacity: default_me_writer_cmd_channel_capacity(),
|
||||||
me_route_channel_capacity: default_me_route_channel_capacity(),
|
me_route_channel_capacity: default_me_route_channel_capacity(),
|
||||||
me_c2me_channel_capacity: default_me_c2me_channel_capacity(),
|
me_c2me_channel_capacity: default_me_c2me_channel_capacity(),
|
||||||
|
me_c2me_send_timeout_ms: default_me_c2me_send_timeout_ms(),
|
||||||
me_reader_route_data_wait_ms: default_me_reader_route_data_wait_ms(),
|
me_reader_route_data_wait_ms: default_me_reader_route_data_wait_ms(),
|
||||||
me_d2c_flush_batch_max_frames: default_me_d2c_flush_batch_max_frames(),
|
me_d2c_flush_batch_max_frames: default_me_d2c_flush_batch_max_frames(),
|
||||||
me_d2c_flush_batch_max_bytes: default_me_d2c_flush_batch_max_bytes(),
|
me_d2c_flush_batch_max_bytes: default_me_d2c_flush_batch_max_bytes(),
|
||||||
|
|
@ -975,6 +990,8 @@ impl Default for GeneralConfig {
|
||||||
me_warn_rate_limit_ms: default_me_warn_rate_limit_ms(),
|
me_warn_rate_limit_ms: default_me_warn_rate_limit_ms(),
|
||||||
me_route_no_writer_mode: MeRouteNoWriterMode::default(),
|
me_route_no_writer_mode: MeRouteNoWriterMode::default(),
|
||||||
me_route_no_writer_wait_ms: default_me_route_no_writer_wait_ms(),
|
me_route_no_writer_wait_ms: default_me_route_no_writer_wait_ms(),
|
||||||
|
me_route_hybrid_max_wait_ms: default_me_route_hybrid_max_wait_ms(),
|
||||||
|
me_route_blocking_send_timeout_ms: default_me_route_blocking_send_timeout_ms(),
|
||||||
me_route_inline_recovery_attempts: default_me_route_inline_recovery_attempts(),
|
me_route_inline_recovery_attempts: default_me_route_inline_recovery_attempts(),
|
||||||
me_route_inline_recovery_wait_ms: default_me_route_inline_recovery_wait_ms(),
|
me_route_inline_recovery_wait_ms: default_me_route_inline_recovery_wait_ms(),
|
||||||
links: LinksConfig::default(),
|
links: LinksConfig::default(),
|
||||||
|
|
@ -1207,6 +1224,11 @@ pub struct ServerConfig {
|
||||||
/// 0 means unlimited.
|
/// 0 means unlimited.
|
||||||
#[serde(default = "default_server_max_connections")]
|
#[serde(default = "default_server_max_connections")]
|
||||||
pub max_connections: u32,
|
pub max_connections: u32,
|
||||||
|
|
||||||
|
/// Maximum wait in milliseconds while acquiring a connection slot permit.
|
||||||
|
/// `0` keeps legacy unbounded wait behavior.
|
||||||
|
#[serde(default = "default_accept_permit_timeout_ms")]
|
||||||
|
pub accept_permit_timeout_ms: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for ServerConfig {
|
impl Default for ServerConfig {
|
||||||
|
|
@ -1226,6 +1248,7 @@ impl Default for ServerConfig {
|
||||||
api: ApiConfig::default(),
|
api: ApiConfig::default(),
|
||||||
listeners: Vec::new(),
|
listeners: Vec::new(),
|
||||||
max_connections: default_server_max_connections(),
|
max_connections: default_server_max_connections(),
|
||||||
|
accept_permit_timeout_ms: default_accept_permit_timeout_ms(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -205,6 +205,7 @@ pub(crate) fn format_uptime(total_secs: u64) -> String {
|
||||||
format!("{} / {} seconds", parts.join(", "), total_secs)
|
format!("{} / {} seconds", parts.join(", "), total_secs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
pub(crate) async fn wait_until_admission_open(admission_rx: &mut watch::Receiver<bool>) -> bool {
|
pub(crate) async fn wait_until_admission_open(admission_rx: &mut watch::Receiver<bool>) -> bool {
|
||||||
loop {
|
loop {
|
||||||
if *admission_rx.borrow() {
|
if *admission_rx.borrow() {
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,7 @@ use crate::transport::{
|
||||||
ListenOptions, UpstreamManager, create_listener, find_listener_processes,
|
ListenOptions, UpstreamManager, create_listener, find_listener_processes,
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::helpers::{is_expected_handshake_eof, print_proxy_links, wait_until_admission_open};
|
use super::helpers::{is_expected_handshake_eof, print_proxy_links};
|
||||||
|
|
||||||
pub(crate) struct BoundListeners {
|
pub(crate) struct BoundListeners {
|
||||||
pub(crate) listeners: Vec<(TcpListener, bool)>,
|
pub(crate) listeners: Vec<(TcpListener, bool)>,
|
||||||
|
|
@ -195,7 +195,7 @@ pub(crate) async fn bind_listeners(
|
||||||
has_unix_listener = true;
|
has_unix_listener = true;
|
||||||
|
|
||||||
let mut config_rx_unix: watch::Receiver<Arc<ProxyConfig>> = config_rx.clone();
|
let mut config_rx_unix: watch::Receiver<Arc<ProxyConfig>> = config_rx.clone();
|
||||||
let mut admission_rx_unix = admission_rx.clone();
|
let admission_rx_unix = admission_rx.clone();
|
||||||
let stats = stats.clone();
|
let stats = stats.clone();
|
||||||
let upstream_manager = upstream_manager.clone();
|
let upstream_manager = upstream_manager.clone();
|
||||||
let replay_checker = replay_checker.clone();
|
let replay_checker = replay_checker.clone();
|
||||||
|
|
@ -212,18 +212,45 @@ pub(crate) async fn bind_listeners(
|
||||||
let unix_conn_counter = Arc::new(std::sync::atomic::AtomicU64::new(1));
|
let unix_conn_counter = Arc::new(std::sync::atomic::AtomicU64::new(1));
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
if !wait_until_admission_open(&mut admission_rx_unix).await {
|
|
||||||
warn!("Conditional-admission gate channel closed for unix listener");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
match unix_listener.accept().await {
|
match unix_listener.accept().await {
|
||||||
Ok((stream, _)) => {
|
Ok((stream, _)) => {
|
||||||
let permit = match max_connections_unix.clone().acquire_owned().await {
|
if !*admission_rx_unix.borrow() {
|
||||||
|
drop(stream);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let accept_permit_timeout_ms = config_rx_unix
|
||||||
|
.borrow()
|
||||||
|
.server
|
||||||
|
.accept_permit_timeout_ms;
|
||||||
|
let permit = if accept_permit_timeout_ms == 0 {
|
||||||
|
match max_connections_unix.clone().acquire_owned().await {
|
||||||
Ok(permit) => permit,
|
Ok(permit) => permit,
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
error!("Connection limiter is closed");
|
error!("Connection limiter is closed");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
match tokio::time::timeout(
|
||||||
|
Duration::from_millis(accept_permit_timeout_ms),
|
||||||
|
max_connections_unix.clone().acquire_owned(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(Ok(permit)) => permit,
|
||||||
|
Ok(Err(_)) => {
|
||||||
|
error!("Connection limiter is closed");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
debug!(
|
||||||
|
timeout_ms = accept_permit_timeout_ms,
|
||||||
|
"Dropping accepted unix connection: permit wait timeout"
|
||||||
|
);
|
||||||
|
drop(stream);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
};
|
};
|
||||||
let conn_id =
|
let conn_id =
|
||||||
unix_conn_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
unix_conn_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||||
|
|
@ -312,7 +339,7 @@ pub(crate) fn spawn_tcp_accept_loops(
|
||||||
) {
|
) {
|
||||||
for (listener, listener_proxy_protocol) in listeners {
|
for (listener, listener_proxy_protocol) in listeners {
|
||||||
let mut config_rx: watch::Receiver<Arc<ProxyConfig>> = config_rx.clone();
|
let mut config_rx: watch::Receiver<Arc<ProxyConfig>> = config_rx.clone();
|
||||||
let mut admission_rx_tcp = admission_rx.clone();
|
let admission_rx_tcp = admission_rx.clone();
|
||||||
let stats = stats.clone();
|
let stats = stats.clone();
|
||||||
let upstream_manager = upstream_manager.clone();
|
let upstream_manager = upstream_manager.clone();
|
||||||
let replay_checker = replay_checker.clone();
|
let replay_checker = replay_checker.clone();
|
||||||
|
|
@ -327,18 +354,47 @@ pub(crate) fn spawn_tcp_accept_loops(
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
if !wait_until_admission_open(&mut admission_rx_tcp).await {
|
|
||||||
warn!("Conditional-admission gate channel closed for tcp listener");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
match listener.accept().await {
|
match listener.accept().await {
|
||||||
Ok((stream, peer_addr)) => {
|
Ok((stream, peer_addr)) => {
|
||||||
let permit = match max_connections_tcp.clone().acquire_owned().await {
|
if !*admission_rx_tcp.borrow() {
|
||||||
|
debug!(peer = %peer_addr, "Admission gate closed, dropping connection");
|
||||||
|
drop(stream);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let accept_permit_timeout_ms = config_rx
|
||||||
|
.borrow()
|
||||||
|
.server
|
||||||
|
.accept_permit_timeout_ms;
|
||||||
|
let permit = if accept_permit_timeout_ms == 0 {
|
||||||
|
match max_connections_tcp.clone().acquire_owned().await {
|
||||||
Ok(permit) => permit,
|
Ok(permit) => permit,
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
error!("Connection limiter is closed");
|
error!("Connection limiter is closed");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
match tokio::time::timeout(
|
||||||
|
Duration::from_millis(accept_permit_timeout_ms),
|
||||||
|
max_connections_tcp.clone().acquire_owned(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(Ok(permit)) => permit,
|
||||||
|
Ok(Err(_)) => {
|
||||||
|
error!("Connection limiter is closed");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
debug!(
|
||||||
|
peer = %peer_addr,
|
||||||
|
timeout_ms = accept_permit_timeout_ms,
|
||||||
|
"Dropping accepted connection: permit wait timeout"
|
||||||
|
);
|
||||||
|
drop(stream);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
};
|
};
|
||||||
let config = config_rx.borrow_and_update().clone();
|
let config = config_rx.borrow_and_update().clone();
|
||||||
let stats = stats.clone();
|
let stats = stats.clone();
|
||||||
|
|
|
||||||
|
|
@ -267,6 +267,8 @@ pub(crate) async fn initialize_me_pool(
|
||||||
config.general.me_warn_rate_limit_ms,
|
config.general.me_warn_rate_limit_ms,
|
||||||
config.general.me_route_no_writer_mode,
|
config.general.me_route_no_writer_mode,
|
||||||
config.general.me_route_no_writer_wait_ms,
|
config.general.me_route_no_writer_wait_ms,
|
||||||
|
config.general.me_route_hybrid_max_wait_ms,
|
||||||
|
config.general.me_route_blocking_send_timeout_ms,
|
||||||
config.general.me_route_inline_recovery_attempts,
|
config.general.me_route_inline_recovery_attempts,
|
||||||
config.general.me_route_inline_recovery_wait_ms,
|
config.general.me_route_inline_recovery_wait_ms,
|
||||||
);
|
);
|
||||||
|
|
|
||||||
|
|
@ -222,6 +222,7 @@ fn should_yield_c2me_sender(sent_since_yield: usize, has_backlog: bool) -> bool
|
||||||
async fn enqueue_c2me_command(
|
async fn enqueue_c2me_command(
|
||||||
tx: &mpsc::Sender<C2MeCommand>,
|
tx: &mpsc::Sender<C2MeCommand>,
|
||||||
cmd: C2MeCommand,
|
cmd: C2MeCommand,
|
||||||
|
send_timeout: Duration,
|
||||||
) -> std::result::Result<(), mpsc::error::SendError<C2MeCommand>> {
|
) -> std::result::Result<(), mpsc::error::SendError<C2MeCommand>> {
|
||||||
match tx.try_send(cmd) {
|
match tx.try_send(cmd) {
|
||||||
Ok(()) => Ok(()),
|
Ok(()) => Ok(()),
|
||||||
|
|
@ -231,7 +232,17 @@ async fn enqueue_c2me_command(
|
||||||
if tx.capacity() <= C2ME_SOFT_PRESSURE_MIN_FREE_SLOTS {
|
if tx.capacity() <= C2ME_SOFT_PRESSURE_MIN_FREE_SLOTS {
|
||||||
tokio::task::yield_now().await;
|
tokio::task::yield_now().await;
|
||||||
}
|
}
|
||||||
tx.send(cmd).await
|
if send_timeout.is_zero() {
|
||||||
|
return tx.send(cmd).await;
|
||||||
|
}
|
||||||
|
match tokio::time::timeout(send_timeout, tx.reserve()).await {
|
||||||
|
Ok(Ok(permit)) => {
|
||||||
|
permit.send(cmd);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Ok(Err(_)) => Err(mpsc::error::SendError(cmd)),
|
||||||
|
Err(_) => Err(mpsc::error::SendError(cmd)),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -355,6 +366,7 @@ where
|
||||||
.general
|
.general
|
||||||
.me_c2me_channel_capacity
|
.me_c2me_channel_capacity
|
||||||
.max(C2ME_CHANNEL_CAPACITY_FALLBACK);
|
.max(C2ME_CHANNEL_CAPACITY_FALLBACK);
|
||||||
|
let c2me_send_timeout = Duration::from_millis(config.general.me_c2me_send_timeout_ms);
|
||||||
let (c2me_tx, mut c2me_rx) = mpsc::channel::<C2MeCommand>(c2me_channel_capacity);
|
let (c2me_tx, mut c2me_rx) = mpsc::channel::<C2MeCommand>(c2me_channel_capacity);
|
||||||
let me_pool_c2me = me_pool.clone();
|
let me_pool_c2me = me_pool.clone();
|
||||||
let effective_tag = effective_tag;
|
let effective_tag = effective_tag;
|
||||||
|
|
@ -363,6 +375,21 @@ where
|
||||||
while let Some(cmd) = c2me_rx.recv().await {
|
while let Some(cmd) = c2me_rx.recv().await {
|
||||||
match cmd {
|
match cmd {
|
||||||
C2MeCommand::Data { payload, flags } => {
|
C2MeCommand::Data { payload, flags } => {
|
||||||
|
if c2me_send_timeout.is_zero() {
|
||||||
|
me_pool_c2me
|
||||||
|
.send_proxy_req(
|
||||||
|
conn_id,
|
||||||
|
success.dc_idx,
|
||||||
|
peer,
|
||||||
|
translated_local_addr,
|
||||||
|
payload.as_ref(),
|
||||||
|
flags,
|
||||||
|
effective_tag.as_deref(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
} else {
|
||||||
|
match tokio::time::timeout(
|
||||||
|
c2me_send_timeout,
|
||||||
me_pool_c2me.send_proxy_req(
|
me_pool_c2me.send_proxy_req(
|
||||||
conn_id,
|
conn_id,
|
||||||
success.dc_idx,
|
success.dc_idx,
|
||||||
|
|
@ -371,7 +398,19 @@ where
|
||||||
payload.as_ref(),
|
payload.as_ref(),
|
||||||
flags,
|
flags,
|
||||||
effective_tag.as_deref(),
|
effective_tag.as_deref(),
|
||||||
).await?;
|
),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(send_result) => send_result?,
|
||||||
|
Err(_) => {
|
||||||
|
return Err(ProxyError::Proxy(format!(
|
||||||
|
"ME send timeout after {}ms",
|
||||||
|
c2me_send_timeout.as_millis()
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
sent_since_yield = sent_since_yield.saturating_add(1);
|
sent_since_yield = sent_since_yield.saturating_add(1);
|
||||||
if should_yield_c2me_sender(sent_since_yield, !c2me_rx.is_empty()) {
|
if should_yield_c2me_sender(sent_since_yield, !c2me_rx.is_empty()) {
|
||||||
sent_since_yield = 0;
|
sent_since_yield = 0;
|
||||||
|
|
@ -555,7 +594,7 @@ where
|
||||||
loop {
|
loop {
|
||||||
if session_lease.is_stale() {
|
if session_lease.is_stale() {
|
||||||
stats.increment_reconnect_stale_close_total();
|
stats.increment_reconnect_stale_close_total();
|
||||||
let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close).await;
|
let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close, c2me_send_timeout).await;
|
||||||
main_result = Err(ProxyError::Proxy("Session evicted by reconnect".to_string()));
|
main_result = Err(ProxyError::Proxy("Session evicted by reconnect".to_string()));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
@ -573,7 +612,7 @@ where
|
||||||
"Cutover affected middle session, closing client connection"
|
"Cutover affected middle session, closing client connection"
|
||||||
);
|
);
|
||||||
tokio::time::sleep(delay).await;
|
tokio::time::sleep(delay).await;
|
||||||
let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close).await;
|
let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close, c2me_send_timeout).await;
|
||||||
main_result = Err(ProxyError::Proxy(ROUTE_SWITCH_ERROR_MSG.to_string()));
|
main_result = Err(ProxyError::Proxy(ROUTE_SWITCH_ERROR_MSG.to_string()));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
@ -607,7 +646,11 @@ where
|
||||||
flags |= RPC_FLAG_NOT_ENCRYPTED;
|
flags |= RPC_FLAG_NOT_ENCRYPTED;
|
||||||
}
|
}
|
||||||
// Keep client read loop lightweight: route heavy ME send path via a dedicated task.
|
// Keep client read loop lightweight: route heavy ME send path via a dedicated task.
|
||||||
if enqueue_c2me_command(&c2me_tx, C2MeCommand::Data { payload, flags })
|
if enqueue_c2me_command(
|
||||||
|
&c2me_tx,
|
||||||
|
C2MeCommand::Data { payload, flags },
|
||||||
|
c2me_send_timeout,
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
.is_err()
|
.is_err()
|
||||||
{
|
{
|
||||||
|
|
@ -618,7 +661,12 @@ where
|
||||||
Ok(None) => {
|
Ok(None) => {
|
||||||
debug!(conn_id, "Client EOF");
|
debug!(conn_id, "Client EOF");
|
||||||
client_closed = true;
|
client_closed = true;
|
||||||
let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close).await;
|
let _ = enqueue_c2me_command(
|
||||||
|
&c2me_tx,
|
||||||
|
C2MeCommand::Close,
|
||||||
|
c2me_send_timeout,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
|
@ -993,6 +1041,7 @@ mod tests {
|
||||||
payload: Bytes::from_static(&[1, 2, 3]),
|
payload: Bytes::from_static(&[1, 2, 3]),
|
||||||
flags: 0,
|
flags: 0,
|
||||||
},
|
},
|
||||||
|
TokioDuration::from_millis(50),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
@ -1028,6 +1077,7 @@ mod tests {
|
||||||
payload: Bytes::from_static(&[7, 7]),
|
payload: Bytes::from_static(&[7, 7]),
|
||||||
flags: 7,
|
flags: 7,
|
||||||
},
|
},
|
||||||
|
TokioDuration::from_millis(100),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
|
||||||
|
|
@ -1574,6 +1574,8 @@ mod tests {
|
||||||
general.me_warn_rate_limit_ms,
|
general.me_warn_rate_limit_ms,
|
||||||
MeRouteNoWriterMode::default(),
|
MeRouteNoWriterMode::default(),
|
||||||
general.me_route_no_writer_wait_ms,
|
general.me_route_no_writer_wait_ms,
|
||||||
|
general.me_route_hybrid_max_wait_ms,
|
||||||
|
general.me_route_blocking_send_timeout_ms,
|
||||||
general.me_route_inline_recovery_attempts,
|
general.me_route_inline_recovery_attempts,
|
||||||
general.me_route_inline_recovery_wait_ms,
|
general.me_route_inline_recovery_wait_ms,
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -111,6 +111,8 @@ async fn make_pool(
|
||||||
general.me_warn_rate_limit_ms,
|
general.me_warn_rate_limit_ms,
|
||||||
MeRouteNoWriterMode::default(),
|
MeRouteNoWriterMode::default(),
|
||||||
general.me_route_no_writer_wait_ms,
|
general.me_route_no_writer_wait_ms,
|
||||||
|
general.me_route_hybrid_max_wait_ms,
|
||||||
|
general.me_route_blocking_send_timeout_ms,
|
||||||
general.me_route_inline_recovery_attempts,
|
general.me_route_inline_recovery_attempts,
|
||||||
general.me_route_inline_recovery_wait_ms,
|
general.me_route_inline_recovery_wait_ms,
|
||||||
);
|
);
|
||||||
|
|
|
||||||
|
|
@ -110,6 +110,8 @@ async fn make_pool(
|
||||||
general.me_warn_rate_limit_ms,
|
general.me_warn_rate_limit_ms,
|
||||||
MeRouteNoWriterMode::default(),
|
MeRouteNoWriterMode::default(),
|
||||||
general.me_route_no_writer_wait_ms,
|
general.me_route_no_writer_wait_ms,
|
||||||
|
general.me_route_hybrid_max_wait_ms,
|
||||||
|
general.me_route_blocking_send_timeout_ms,
|
||||||
general.me_route_inline_recovery_attempts,
|
general.me_route_inline_recovery_attempts,
|
||||||
general.me_route_inline_recovery_wait_ms,
|
general.me_route_inline_recovery_wait_ms,
|
||||||
);
|
);
|
||||||
|
|
|
||||||
|
|
@ -103,6 +103,8 @@ async fn make_pool(me_pool_drain_threshold: u64) -> Arc<MePool> {
|
||||||
general.me_warn_rate_limit_ms,
|
general.me_warn_rate_limit_ms,
|
||||||
MeRouteNoWriterMode::default(),
|
MeRouteNoWriterMode::default(),
|
||||||
general.me_route_no_writer_wait_ms,
|
general.me_route_no_writer_wait_ms,
|
||||||
|
general.me_route_hybrid_max_wait_ms,
|
||||||
|
general.me_route_blocking_send_timeout_ms,
|
||||||
general.me_route_inline_recovery_attempts,
|
general.me_route_inline_recovery_attempts,
|
||||||
general.me_route_inline_recovery_wait_ms,
|
general.me_route_inline_recovery_wait_ms,
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -193,6 +193,8 @@ pub struct MePool {
|
||||||
pub(super) me_reader_route_data_wait_ms: Arc<AtomicU64>,
|
pub(super) me_reader_route_data_wait_ms: Arc<AtomicU64>,
|
||||||
pub(super) me_route_no_writer_mode: AtomicU8,
|
pub(super) me_route_no_writer_mode: AtomicU8,
|
||||||
pub(super) me_route_no_writer_wait: Duration,
|
pub(super) me_route_no_writer_wait: Duration,
|
||||||
|
pub(super) me_route_hybrid_max_wait: Duration,
|
||||||
|
pub(super) me_route_blocking_send_timeout: Duration,
|
||||||
pub(super) me_route_inline_recovery_attempts: u32,
|
pub(super) me_route_inline_recovery_attempts: u32,
|
||||||
pub(super) me_route_inline_recovery_wait: Duration,
|
pub(super) me_route_inline_recovery_wait: Duration,
|
||||||
pub(super) me_health_interval_ms_unhealthy: AtomicU64,
|
pub(super) me_health_interval_ms_unhealthy: AtomicU64,
|
||||||
|
|
@ -307,6 +309,8 @@ impl MePool {
|
||||||
me_warn_rate_limit_ms: u64,
|
me_warn_rate_limit_ms: u64,
|
||||||
me_route_no_writer_mode: MeRouteNoWriterMode,
|
me_route_no_writer_mode: MeRouteNoWriterMode,
|
||||||
me_route_no_writer_wait_ms: u64,
|
me_route_no_writer_wait_ms: u64,
|
||||||
|
me_route_hybrid_max_wait_ms: u64,
|
||||||
|
me_route_blocking_send_timeout_ms: u64,
|
||||||
me_route_inline_recovery_attempts: u32,
|
me_route_inline_recovery_attempts: u32,
|
||||||
me_route_inline_recovery_wait_ms: u64,
|
me_route_inline_recovery_wait_ms: u64,
|
||||||
) -> Arc<Self> {
|
) -> Arc<Self> {
|
||||||
|
|
@ -490,6 +494,10 @@ impl MePool {
|
||||||
me_reader_route_data_wait_ms: Arc::new(AtomicU64::new(me_reader_route_data_wait_ms)),
|
me_reader_route_data_wait_ms: Arc::new(AtomicU64::new(me_reader_route_data_wait_ms)),
|
||||||
me_route_no_writer_mode: AtomicU8::new(me_route_no_writer_mode.as_u8()),
|
me_route_no_writer_mode: AtomicU8::new(me_route_no_writer_mode.as_u8()),
|
||||||
me_route_no_writer_wait: Duration::from_millis(me_route_no_writer_wait_ms),
|
me_route_no_writer_wait: Duration::from_millis(me_route_no_writer_wait_ms),
|
||||||
|
me_route_hybrid_max_wait: Duration::from_millis(me_route_hybrid_max_wait_ms),
|
||||||
|
me_route_blocking_send_timeout: Duration::from_millis(
|
||||||
|
me_route_blocking_send_timeout_ms,
|
||||||
|
),
|
||||||
me_route_inline_recovery_attempts,
|
me_route_inline_recovery_attempts,
|
||||||
me_route_inline_recovery_wait: Duration::from_millis(me_route_inline_recovery_wait_ms),
|
me_route_inline_recovery_wait: Duration::from_millis(me_route_inline_recovery_wait_ms),
|
||||||
me_health_interval_ms_unhealthy: AtomicU64::new(me_health_interval_ms_unhealthy.max(1)),
|
me_health_interval_ms_unhealthy: AtomicU64::new(me_health_interval_ms_unhealthy.max(1)),
|
||||||
|
|
|
||||||
|
|
@ -312,8 +312,6 @@ impl MePool {
|
||||||
let mut p = Vec::with_capacity(12);
|
let mut p = Vec::with_capacity(12);
|
||||||
p.extend_from_slice(&RPC_PING_U32.to_le_bytes());
|
p.extend_from_slice(&RPC_PING_U32.to_le_bytes());
|
||||||
p.extend_from_slice(&sent_id.to_le_bytes());
|
p.extend_from_slice(&sent_id.to_le_bytes());
|
||||||
{
|
|
||||||
let mut tracker = ping_tracker_ping.lock().await;
|
|
||||||
let now_epoch_ms = std::time::SystemTime::now()
|
let now_epoch_ms = std::time::SystemTime::now()
|
||||||
.duration_since(std::time::UNIX_EPOCH)
|
.duration_since(std::time::UNIX_EPOCH)
|
||||||
.unwrap_or_default()
|
.unwrap_or_default()
|
||||||
|
|
@ -337,17 +335,6 @@ impl MePool {
|
||||||
run_cleanup = true;
|
run_cleanup = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if run_cleanup {
|
|
||||||
let before = tracker.len();
|
|
||||||
tracker.retain(|_, (ts, _)| ts.elapsed() < Duration::from_secs(120));
|
|
||||||
let expired = before.saturating_sub(tracker.len());
|
|
||||||
if expired > 0 {
|
|
||||||
stats_ping.increment_me_keepalive_timeout_by(expired as u64);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
tracker.insert(sent_id, (std::time::Instant::now(), writer_id));
|
|
||||||
}
|
|
||||||
ping_id = ping_id.wrapping_add(1);
|
ping_id = ping_id.wrapping_add(1);
|
||||||
stats_ping.increment_me_keepalive_sent();
|
stats_ping.increment_me_keepalive_sent();
|
||||||
if tx_ping
|
if tx_ping
|
||||||
|
|
@ -367,6 +354,16 @@ impl MePool {
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
let mut tracker = ping_tracker_ping.lock().await;
|
||||||
|
if run_cleanup {
|
||||||
|
let before = tracker.len();
|
||||||
|
tracker.retain(|_, (ts, _)| ts.elapsed() < Duration::from_secs(120));
|
||||||
|
let expired = before.saturating_sub(tracker.len());
|
||||||
|
if expired > 0 {
|
||||||
|
stats_ping.increment_me_keepalive_timeout_by(expired as u64);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tracker.insert(sent_id, (std::time::Instant::now(), writer_id));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,7 @@ use std::sync::atomic::Ordering;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
use tokio::sync::mpsc::error::TrySendError;
|
use tokio::sync::mpsc::error::TrySendError;
|
||||||
use tracing::{debug, warn};
|
use tracing::{debug, warn};
|
||||||
|
|
||||||
|
|
@ -29,6 +30,29 @@ const PICK_PENALTY_DRAINING: u64 = 600;
|
||||||
const PICK_PENALTY_STALE: u64 = 300;
|
const PICK_PENALTY_STALE: u64 = 300;
|
||||||
const PICK_PENALTY_DEGRADED: u64 = 250;
|
const PICK_PENALTY_DEGRADED: u64 = 250;
|
||||||
|
|
||||||
|
enum TimedSendError<T> {
|
||||||
|
Closed(T),
|
||||||
|
Timeout(T),
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_writer_command_with_timeout(
|
||||||
|
tx: &mpsc::Sender<WriterCommand>,
|
||||||
|
cmd: WriterCommand,
|
||||||
|
timeout: Duration,
|
||||||
|
) -> std::result::Result<(), TimedSendError<WriterCommand>> {
|
||||||
|
if timeout.is_zero() {
|
||||||
|
return tx.send(cmd).await.map_err(|err| TimedSendError::Closed(err.0));
|
||||||
|
}
|
||||||
|
match tokio::time::timeout(timeout, tx.reserve()).await {
|
||||||
|
Ok(Ok(permit)) => {
|
||||||
|
permit.send(cmd);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Ok(Err(_)) => Err(TimedSendError::Closed(cmd)),
|
||||||
|
Err(_) => Err(TimedSendError::Timeout(cmd)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl MePool {
|
impl MePool {
|
||||||
/// Send RPC_PROXY_REQ. `tag_override`: per-user ad_tag (from access.user_ad_tags); if None, uses pool default.
|
/// Send RPC_PROXY_REQ. `tag_override`: per-user ad_tag (from access.user_ad_tags); if None, uses pool default.
|
||||||
pub async fn send_proxy_req(
|
pub async fn send_proxy_req(
|
||||||
|
|
@ -78,8 +102,18 @@ impl MePool {
|
||||||
let mut hybrid_last_recovery_at: Option<Instant> = None;
|
let mut hybrid_last_recovery_at: Option<Instant> = None;
|
||||||
let hybrid_wait_step = self.me_route_no_writer_wait.max(Duration::from_millis(50));
|
let hybrid_wait_step = self.me_route_no_writer_wait.max(Duration::from_millis(50));
|
||||||
let mut hybrid_wait_current = hybrid_wait_step;
|
let mut hybrid_wait_current = hybrid_wait_step;
|
||||||
|
let hybrid_deadline = Instant::now() + self.me_route_hybrid_max_wait;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
if matches!(no_writer_mode, MeRouteNoWriterMode::HybridAsyncPersistent)
|
||||||
|
&& Instant::now() >= hybrid_deadline
|
||||||
|
{
|
||||||
|
self.stats.increment_me_no_writer_failfast_total();
|
||||||
|
return Err(ProxyError::Proxy(
|
||||||
|
"No ME writer available in hybrid wait window".into(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
let mut skip_writer_id: Option<u64> = None;
|
||||||
let current_meta = self
|
let current_meta = self
|
||||||
.registry
|
.registry
|
||||||
.get_meta(conn_id)
|
.get_meta(conn_id)
|
||||||
|
|
@ -90,13 +124,31 @@ impl MePool {
|
||||||
match current.tx.try_send(WriterCommand::Data(current_payload.clone())) {
|
match current.tx.try_send(WriterCommand::Data(current_payload.clone())) {
|
||||||
Ok(()) => return Ok(()),
|
Ok(()) => return Ok(()),
|
||||||
Err(TrySendError::Full(cmd)) => {
|
Err(TrySendError::Full(cmd)) => {
|
||||||
if current.tx.send(cmd).await.is_ok() {
|
match send_writer_command_with_timeout(
|
||||||
return Ok(());
|
¤t.tx,
|
||||||
}
|
cmd,
|
||||||
|
self.me_route_blocking_send_timeout,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(()) => return Ok(()),
|
||||||
|
Err(TimedSendError::Closed(_)) => {
|
||||||
warn!(writer_id = current.writer_id, "ME writer channel closed");
|
warn!(writer_id = current.writer_id, "ME writer channel closed");
|
||||||
self.remove_writer_and_close_clients(current.writer_id).await;
|
self.remove_writer_and_close_clients(current.writer_id).await;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
Err(TimedSendError::Timeout(_)) => {
|
||||||
|
debug!(
|
||||||
|
conn_id,
|
||||||
|
writer_id = current.writer_id,
|
||||||
|
timeout_ms = self.me_route_blocking_send_timeout.as_millis()
|
||||||
|
as u64,
|
||||||
|
"ME writer send timed out for bound writer, trying reroute"
|
||||||
|
);
|
||||||
|
skip_writer_id = Some(current.writer_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Err(TrySendError::Closed(_)) => {
|
Err(TrySendError::Closed(_)) => {
|
||||||
warn!(writer_id = current.writer_id, "ME writer channel closed");
|
warn!(writer_id = current.writer_id, "ME writer channel closed");
|
||||||
self.remove_writer_and_close_clients(current.writer_id).await;
|
self.remove_writer_and_close_clients(current.writer_id).await;
|
||||||
|
|
@ -200,6 +252,9 @@ impl MePool {
|
||||||
.candidate_indices_for_dc(&writers_snapshot, routed_dc, true)
|
.candidate_indices_for_dc(&writers_snapshot, routed_dc, true)
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
if let Some(skip_writer_id) = skip_writer_id {
|
||||||
|
candidate_indices.retain(|idx| writers_snapshot[*idx].id != skip_writer_id);
|
||||||
|
}
|
||||||
if candidate_indices.is_empty() {
|
if candidate_indices.is_empty() {
|
||||||
let pick_mode = self.writer_pick_mode();
|
let pick_mode = self.writer_pick_mode();
|
||||||
match no_writer_mode {
|
match no_writer_mode {
|
||||||
|
|
@ -422,7 +477,13 @@ impl MePool {
|
||||||
self.stats.increment_me_writer_pick_blocking_fallback_total();
|
self.stats.increment_me_writer_pick_blocking_fallback_total();
|
||||||
let effective_our_addr = SocketAddr::new(w.source_ip, our_addr.port());
|
let effective_our_addr = SocketAddr::new(w.source_ip, our_addr.port());
|
||||||
let (payload, meta) = build_routed_payload(effective_our_addr);
|
let (payload, meta) = build_routed_payload(effective_our_addr);
|
||||||
match w.tx.send(WriterCommand::Data(payload.clone())).await {
|
match send_writer_command_with_timeout(
|
||||||
|
&w.tx,
|
||||||
|
WriterCommand::Data(payload.clone()),
|
||||||
|
self.me_route_blocking_send_timeout,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
self.stats
|
self.stats
|
||||||
.increment_me_writer_pick_success_fallback_total(pick_mode);
|
.increment_me_writer_pick_success_fallback_total(pick_mode);
|
||||||
|
|
@ -439,11 +500,20 @@ impl MePool {
|
||||||
}
|
}
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(TimedSendError::Closed(_)) => {
|
||||||
self.stats.increment_me_writer_pick_closed_total(pick_mode);
|
self.stats.increment_me_writer_pick_closed_total(pick_mode);
|
||||||
warn!(writer_id = w.id, "ME writer channel closed (blocking)");
|
warn!(writer_id = w.id, "ME writer channel closed (blocking)");
|
||||||
self.remove_writer_and_close_clients(w.id).await;
|
self.remove_writer_and_close_clients(w.id).await;
|
||||||
}
|
}
|
||||||
|
Err(TimedSendError::Timeout(_)) => {
|
||||||
|
self.stats.increment_me_writer_pick_full_total(pick_mode);
|
||||||
|
debug!(
|
||||||
|
conn_id,
|
||||||
|
writer_id = w.id,
|
||||||
|
timeout_ms = self.me_route_blocking_send_timeout.as_millis() as u64,
|
||||||
|
"ME writer blocking fallback send timed out"
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue