From 09bdafa718db0af448f597250e83cff273298413 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 5 Mar 2026 14:39:32 +0300 Subject: [PATCH] Performance improvements --- src/config/defaults.rs | 16 ++ src/config/hot_reload.rs | 16 ++ src/config/load.rs | 61 ++++++ src/config/types.rs | 50 +++++ src/main.rs | 4 + src/metrics.rs | 65 +++++++ src/proxy/client.rs | 113 +++++------ src/proxy/direct_relay.rs | 12 +- src/stats/mod.rs | 50 +++++ src/transport/middle_proxy/pool.rs | 14 +- src/transport/middle_proxy/send.rs | 300 +++++++++++++++++++++++------ 11 files changed, 584 insertions(+), 117 deletions(-) diff --git a/src/config/defaults.rs b/src/config/defaults.rs index b73013a..15be561 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -129,6 +129,10 @@ pub(crate) fn default_unknown_dc_log_path() -> Option { Some("unknown-dc.txt".to_string()) } +pub(crate) fn default_unknown_dc_file_log_enabled() -> bool { + false +} + pub(crate) fn default_pool_size() -> usize { 8 } @@ -273,6 +277,18 @@ pub(crate) fn default_me_route_backpressure_high_watermark_pct() -> u8 { 80 } +pub(crate) fn default_me_route_no_writer_wait_ms() -> u64 { + 250 +} + +pub(crate) fn default_me_route_inline_recovery_attempts() -> u32 { + 3 +} + +pub(crate) fn default_me_route_inline_recovery_wait_ms() -> u64 { + 3000 +} + pub(crate) fn default_beobachten_minutes() -> u64 { 10 } diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index b03f83e..14939a0 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -381,6 +381,22 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b warned = true; warn!("config reload: general.middle_proxy_pool_size changed; restart required"); } + if old.general.me_route_no_writer_mode != new.general.me_route_no_writer_mode + || old.general.me_route_no_writer_wait_ms != new.general.me_route_no_writer_wait_ms + || old.general.me_route_inline_recovery_attempts + != new.general.me_route_inline_recovery_attempts + || old.general.me_route_inline_recovery_wait_ms + != new.general.me_route_inline_recovery_wait_ms + { + warned = true; + warn!("config reload: general.me_route_no_writer_* changed; restart required"); + } + if old.general.unknown_dc_log_path != new.general.unknown_dc_log_path + || old.general.unknown_dc_file_log_enabled != new.general.unknown_dc_file_log_enabled + { + warned = true; + warn!("config reload: general.unknown_dc_* changed; restart required"); + } if old.general.me_init_retry_attempts != new.general.me_init_retry_attempts { warned = true; warn!("config reload: general.me_init_retry_attempts changed; restart required"); diff --git a/src/config/load.rs b/src/config/load.rs index a2ee5f0..2954f04 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -410,6 +410,24 @@ impl ProxyConfig { )); } + if !(10..=5000).contains(&config.general.me_route_no_writer_wait_ms) { + return Err(ProxyError::Config( + "general.me_route_no_writer_wait_ms must be within [10, 5000]".to_string(), + )); + } + + if config.general.me_route_inline_recovery_attempts == 0 { + return Err(ProxyError::Config( + "general.me_route_inline_recovery_attempts must be > 0".to_string(), + )); + } + + if !(10..=30000).contains(&config.general.me_route_inline_recovery_wait_ms) { + return Err(ProxyError::Config( + "general.me_route_inline_recovery_wait_ms must be within [10, 30000]".to_string(), + )); + } + if config.server.api.request_body_limit_bytes == 0 { return Err(ProxyError::Config( "server.api.request_body_limit_bytes must be > 0".to_string(), @@ -1206,6 +1224,49 @@ mod tests { let _ = std::fs::remove_file(path_valid); } + #[test] + fn me_route_no_writer_wait_ms_out_of_range_is_rejected() { + let toml = r#" + [general] + me_route_no_writer_wait_ms = 5 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_me_route_no_writer_wait_ms_out_of_range_test.toml"); + std::fs::write(&path, toml).unwrap(); + let err = ProxyConfig::load(&path).unwrap_err().to_string(); + assert!(err.contains("general.me_route_no_writer_wait_ms must be within [10, 5000]")); + let _ = std::fs::remove_file(path); + } + + #[test] + fn me_route_no_writer_mode_is_parsed() { + let toml = r#" + [general] + me_route_no_writer_mode = "inline_recovery_legacy" + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_me_route_no_writer_mode_parse_test.toml"); + std::fs::write(&path, toml).unwrap(); + let cfg = ProxyConfig::load(&path).unwrap(); + assert_eq!( + cfg.general.me_route_no_writer_mode, + crate::config::MeRouteNoWriterMode::InlineRecoveryLegacy + ); + let _ = std::fs::remove_file(path); + } + #[test] fn me_hardswap_warmup_defaults_are_set() { let toml = r#" diff --git a/src/config/types.rs b/src/config/types.rs index 5dc1c87..a9eaeae 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -183,6 +183,31 @@ impl MeFloorMode { } } +/// Middle-End route behavior when no writer is immediately available. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] +#[serde(rename_all = "snake_case")] +pub enum MeRouteNoWriterMode { + #[default] + AsyncRecoveryFailfast, + InlineRecoveryLegacy, +} + +impl MeRouteNoWriterMode { + pub fn as_u8(self) -> u8 { + match self { + MeRouteNoWriterMode::AsyncRecoveryFailfast => 0, + MeRouteNoWriterMode::InlineRecoveryLegacy => 1, + } + } + + pub fn from_u8(raw: u8) -> Self { + match raw { + 1 => MeRouteNoWriterMode::InlineRecoveryLegacy, + _ => MeRouteNoWriterMode::AsyncRecoveryFailfast, + } + } +} + /// Per-user unique source IP limit mode. #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] #[serde(rename_all = "snake_case")] @@ -511,6 +536,10 @@ pub struct GeneralConfig { #[serde(default = "default_unknown_dc_log_path")] pub unknown_dc_log_path: Option, + /// Enable unknown-DC file logging. + #[serde(default = "default_unknown_dc_file_log_enabled")] + pub unknown_dc_file_log_enabled: bool, + #[serde(default)] pub log_level: LogLevel, @@ -538,6 +567,22 @@ pub struct GeneralConfig { #[serde(default = "default_me_route_backpressure_high_watermark_pct")] pub me_route_backpressure_high_watermark_pct: u8, + /// ME route behavior when no writer is immediately available. + #[serde(default)] + pub me_route_no_writer_mode: MeRouteNoWriterMode, + + /// Maximum wait time in milliseconds for async-recovery failfast mode. + #[serde(default = "default_me_route_no_writer_wait_ms")] + pub me_route_no_writer_wait_ms: u64, + + /// Number of inline recovery attempts in legacy mode. + #[serde(default = "default_me_route_inline_recovery_attempts")] + pub me_route_inline_recovery_attempts: u32, + + /// Maximum wait time in milliseconds for inline recovery in legacy mode. + #[serde(default = "default_me_route_inline_recovery_wait_ms")] + pub me_route_inline_recovery_wait_ms: u64, + /// [general.links] — proxy link generation overrides. #[serde(default)] pub links: LinksConfig, @@ -719,6 +764,7 @@ impl Default for GeneralConfig { upstream_connect_failfast_hard_errors: default_upstream_connect_failfast_hard_errors(), stun_iface_mismatch_ignore: false, unknown_dc_log_path: default_unknown_dc_log_path(), + unknown_dc_file_log_enabled: default_unknown_dc_file_log_enabled(), log_level: LogLevel::Normal, disable_colors: false, telemetry: TelemetryConfig::default(), @@ -726,6 +772,10 @@ impl Default for GeneralConfig { me_route_backpressure_base_timeout_ms: default_me_route_backpressure_base_timeout_ms(), me_route_backpressure_high_timeout_ms: default_me_route_backpressure_high_timeout_ms(), me_route_backpressure_high_watermark_pct: default_me_route_backpressure_high_watermark_pct(), + me_route_no_writer_mode: MeRouteNoWriterMode::default(), + me_route_no_writer_wait_ms: default_me_route_no_writer_wait_ms(), + me_route_inline_recovery_attempts: default_me_route_inline_recovery_attempts(), + me_route_inline_recovery_wait_ms: default_me_route_inline_recovery_wait_ms(), links: LinksConfig::default(), crypto_pending_buffer: default_crypto_pending_buffer(), max_client_frame: default_max_client_frame(), diff --git a/src/main.rs b/src/main.rs index c4a9c37..fc7cf0e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -617,6 +617,10 @@ async fn main() -> std::result::Result<(), Box> { config.general.me_route_backpressure_base_timeout_ms, config.general.me_route_backpressure_high_timeout_ms, config.general.me_route_backpressure_high_watermark_pct, + config.general.me_route_no_writer_mode, + config.general.me_route_no_writer_wait_ms, + config.general.me_route_inline_recovery_attempts, + config.general.me_route_inline_recovery_wait_ms, ); match pool.init(pool_size, &rng).await { diff --git a/src/metrics.rs b/src/metrics.rs index 1595445..0ccec94 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1199,6 +1199,48 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp 0 } ); + let _ = writeln!( + out, + "# HELP telemt_me_no_writer_failfast_total ME route failfast errors due to missing writer in bounded wait window" + ); + let _ = writeln!(out, "# TYPE telemt_me_no_writer_failfast_total counter"); + let _ = writeln!( + out, + "telemt_me_no_writer_failfast_total {}", + if me_allows_normal { + stats.get_me_no_writer_failfast_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "# HELP telemt_me_async_recovery_trigger_total Async ME recovery trigger attempts from route path" + ); + let _ = writeln!(out, "# TYPE telemt_me_async_recovery_trigger_total counter"); + let _ = writeln!( + out, + "telemt_me_async_recovery_trigger_total {}", + if me_allows_normal { + stats.get_me_async_recovery_trigger_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "# HELP telemt_me_inline_recovery_total Legacy inline ME recovery attempts from route path" + ); + let _ = writeln!(out, "# TYPE telemt_me_inline_recovery_total counter"); + let _ = writeln!( + out, + "telemt_me_inline_recovery_total {}", + if me_allows_normal { + stats.get_me_inline_recovery_total() + } else { + 0 + } + ); let unresolved_writer_losses = if me_allows_normal { stats @@ -1237,6 +1279,29 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp let _ = writeln!(out, "# TYPE telemt_user_msgs_from_client counter"); let _ = writeln!(out, "# HELP telemt_user_msgs_to_client Per-user messages sent"); let _ = writeln!(out, "# TYPE telemt_user_msgs_to_client counter"); + let _ = writeln!( + out, + "# HELP telemt_ip_reservation_rollback_total IP reservation rollbacks caused by later limit checks" + ); + let _ = writeln!(out, "# TYPE telemt_ip_reservation_rollback_total counter"); + let _ = writeln!( + out, + "telemt_ip_reservation_rollback_total{{reason=\"tcp_limit\"}} {}", + if core_enabled { + stats.get_ip_reservation_rollback_tcp_limit_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_ip_reservation_rollback_total{{reason=\"quota_limit\"}} {}", + if core_enabled { + stats.get_ip_reservation_rollback_quota_limit_total() + } else { + 0 + } + ); let _ = writeln!( out, "# HELP telemt_telemetry_user_series_suppressed User-labeled metric series suppression flag" diff --git a/src/proxy/client.rs b/src/proxy/client.rs index 4bc4b65..2c9fa0c 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -672,42 +672,16 @@ impl RunningClientHandler { R: AsyncRead + Unpin + Send + 'static, W: AsyncWrite + Unpin + Send + 'static, { - let user = &success.user; + let user = success.user.clone(); - if let Err(e) = Self::check_user_limits_static(user, &config, &stats, peer_addr, &ip_tracker).await { + if let Err(e) = Self::check_user_limits_static(&user, &config, &stats, peer_addr, &ip_tracker).await { warn!(user = %user, error = %e, "User limit exceeded"); return Err(e); } - // IP Cleanup Guard: автоматически удаляет IP при выходе из scope - struct IpCleanupGuard { - tracker: Arc, - user: String, - ip: std::net::IpAddr, - } - - impl Drop for IpCleanupGuard { - fn drop(&mut self) { - let tracker = self.tracker.clone(); - let user = self.user.clone(); - let ip = self.ip; - tokio::spawn(async move { - tracker.remove_ip(&user, ip).await; - debug!(user = %user, ip = %ip, "IP cleaned up on disconnect"); - }); - } - } - - let _cleanup = IpCleanupGuard { - tracker: ip_tracker, - user: user.clone(), - ip: peer_addr.ip(), - }; - - // Decide: middle proxy or direct - if config.general.use_middle_proxy { + let relay_result = if config.general.use_middle_proxy { if let Some(ref pool) = me_pool { - return handle_via_middle_proxy( + handle_via_middle_proxy( client_reader, client_writer, success, @@ -718,23 +692,38 @@ impl RunningClientHandler { local_addr, rng, ) - .await; + .await + } else { + warn!("use_middle_proxy=true but MePool not initialized, falling back to direct"); + handle_via_direct( + client_reader, + client_writer, + success, + upstream_manager, + stats, + config, + buffer_pool, + rng, + ) + .await } - warn!("use_middle_proxy=true but MePool not initialized, falling back to direct"); - } + } else { + // Direct mode (original behavior) + handle_via_direct( + client_reader, + client_writer, + success, + upstream_manager, + stats, + config, + buffer_pool, + rng, + ) + .await + }; - // Direct mode (original behavior) - handle_via_direct( - client_reader, - client_writer, - success, - upstream_manager, - stats, - config, - buffer_pool, - rng, - ) - .await + ip_tracker.remove_ip(&user, peer_addr.ip()).await; + relay_result } async fn check_user_limits_static( @@ -752,22 +741,32 @@ impl RunningClientHandler { }); } + let mut ip_reserved = false; // IP limit check - if let Err(reason) = ip_tracker.check_and_add(user, peer_addr.ip()).await { - warn!( - user = %user, - ip = %peer_addr.ip(), - reason = %reason, - "IP limit exceeded" - ); - return Err(ProxyError::ConnectionLimitExceeded { - user: user.to_string(), - }); + match ip_tracker.check_and_add(user, peer_addr.ip()).await { + Ok(()) => { + ip_reserved = true; + } + Err(reason) => { + warn!( + user = %user, + ip = %peer_addr.ip(), + reason = %reason, + "IP limit exceeded" + ); + return Err(ProxyError::ConnectionLimitExceeded { + user: user.to_string(), + }); + } } if let Some(limit) = config.access.user_max_tcp_conns.get(user) && stats.get_user_curr_connects(user) >= *limit as u64 { + if ip_reserved { + ip_tracker.remove_ip(user, peer_addr.ip()).await; + stats.increment_ip_reservation_rollback_tcp_limit_total(); + } return Err(ProxyError::ConnectionLimitExceeded { user: user.to_string(), }); @@ -776,6 +775,10 @@ impl RunningClientHandler { if let Some(quota) = config.access.user_data_quota.get(user) && stats.get_user_total_octets(user) >= *quota { + if ip_reserved { + ip_tracker.remove_ip(user, peer_addr.ip()).await; + stats.increment_ip_reservation_rollback_quota_limit_total(); + } return Err(ProxyError::DataQuotaExceeded { user: user.to_string(), }); diff --git a/src/proxy/direct_relay.rs b/src/proxy/direct_relay.rs index e50623d..a1f4945 100644 --- a/src/proxy/direct_relay.rs +++ b/src/proxy/direct_relay.rs @@ -118,10 +118,16 @@ fn get_dc_addr_static(dc_idx: i16, config: &ProxyConfig) -> Result { // Unknown DC requested by client without override: log and fall back. if !config.dc_overrides.contains_key(&dc_key) { warn!(dc_idx = dc_idx, "Requested non-standard DC with no override; falling back to default cluster"); - if let Some(path) = &config.general.unknown_dc_log_path - && let Ok(mut file) = OpenOptions::new().create(true).append(true).open(path) + if config.general.unknown_dc_file_log_enabled + && let Some(path) = &config.general.unknown_dc_log_path + && let Ok(handle) = tokio::runtime::Handle::try_current() { - let _ = writeln!(file, "dc_idx={dc_idx}"); + let path = path.clone(); + handle.spawn_blocking(move || { + if let Ok(mut file) = OpenOptions::new().create(true).append(true).open(path) { + let _ = writeln!(file, "dc_idx={dc_idx}"); + } + }); } } diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 29d7f45..eedc7f6 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -100,6 +100,11 @@ pub struct Stats { me_refill_failed_total: AtomicU64, me_writer_restored_same_endpoint_total: AtomicU64, me_writer_restored_fallback_total: AtomicU64, + me_no_writer_failfast_total: AtomicU64, + me_async_recovery_trigger_total: AtomicU64, + me_inline_recovery_total: AtomicU64, + ip_reservation_rollback_tcp_limit_total: AtomicU64, + ip_reservation_rollback_quota_limit_total: AtomicU64, telemetry_core_enabled: AtomicBool, telemetry_user_enabled: AtomicBool, telemetry_me_level: AtomicU8, @@ -522,6 +527,34 @@ impl Stats { .fetch_add(1, Ordering::Relaxed); } } + pub fn increment_me_no_writer_failfast_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_no_writer_failfast_total.fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_async_recovery_trigger_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_async_recovery_trigger_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_inline_recovery_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_inline_recovery_total.fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_ip_reservation_rollback_tcp_limit_total(&self) { + if self.telemetry_core_enabled() { + self.ip_reservation_rollback_tcp_limit_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_ip_reservation_rollback_quota_limit_total(&self) { + if self.telemetry_core_enabled() { + self.ip_reservation_rollback_quota_limit_total + .fetch_add(1, Ordering::Relaxed); + } + } pub fn increment_me_endpoint_quarantine_total(&self) { if self.telemetry_me_allows_normal() { self.me_endpoint_quarantine_total @@ -791,6 +824,23 @@ impl Stats { pub fn get_me_writer_restored_fallback_total(&self) -> u64 { self.me_writer_restored_fallback_total.load(Ordering::Relaxed) } + pub fn get_me_no_writer_failfast_total(&self) -> u64 { + self.me_no_writer_failfast_total.load(Ordering::Relaxed) + } + pub fn get_me_async_recovery_trigger_total(&self) -> u64 { + self.me_async_recovery_trigger_total.load(Ordering::Relaxed) + } + pub fn get_me_inline_recovery_total(&self) -> u64 { + self.me_inline_recovery_total.load(Ordering::Relaxed) + } + pub fn get_ip_reservation_rollback_tcp_limit_total(&self) -> u64 { + self.ip_reservation_rollback_tcp_limit_total + .load(Ordering::Relaxed) + } + pub fn get_ip_reservation_rollback_quota_limit_total(&self) -> u64 { + self.ip_reservation_rollback_quota_limit_total + .load(Ordering::Relaxed) + } pub fn increment_user_connects(&self, user: &str) { if !self.telemetry_user_enabled() { diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 8c185be..d553944 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -7,7 +7,7 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use tokio::sync::{Mutex, Notify, RwLock, mpsc}; use tokio_util::sync::CancellationToken; -use crate::config::{MeBindStaleMode, MeFloorMode, MeSocksKdfPolicy}; +use crate::config::{MeBindStaleMode, MeFloorMode, MeRouteNoWriterMode, MeSocksKdfPolicy}; use crate::crypto::SecureRandom; use crate::network::IpFamily; use crate::network::probe::NetworkDecision; @@ -145,6 +145,10 @@ pub struct MePool { pub(super) secret_atomic_snapshot: AtomicBool, pub(super) me_deterministic_writer_sort: AtomicBool, pub(super) me_socks_kdf_policy: AtomicU8, + pub(super) me_route_no_writer_mode: AtomicU8, + pub(super) me_route_no_writer_wait: Duration, + pub(super) me_route_inline_recovery_attempts: u32, + pub(super) me_route_inline_recovery_wait: Duration, pool_size: usize, } @@ -227,6 +231,10 @@ impl MePool { me_route_backpressure_base_timeout_ms: u64, me_route_backpressure_high_timeout_ms: u64, me_route_backpressure_high_watermark_pct: u8, + me_route_no_writer_mode: MeRouteNoWriterMode, + me_route_no_writer_wait_ms: u64, + me_route_inline_recovery_attempts: u32, + me_route_inline_recovery_wait_ms: u64, ) -> Arc { let registry = Arc::new(ConnRegistry::new()); registry.update_route_backpressure_policy( @@ -343,6 +351,10 @@ impl MePool { secret_atomic_snapshot: AtomicBool::new(me_secret_atomic_snapshot), me_deterministic_writer_sort: AtomicBool::new(me_deterministic_writer_sort), me_socks_kdf_policy: AtomicU8::new(me_socks_kdf_policy.as_u8()), + me_route_no_writer_mode: AtomicU8::new(me_route_no_writer_mode.as_u8()), + me_route_no_writer_wait: Duration::from_millis(me_route_no_writer_wait_ms), + me_route_inline_recovery_attempts, + me_route_inline_recovery_wait: Duration::from_millis(me_route_inline_recovery_wait_ms), }) } diff --git a/src/transport/middle_proxy/send.rs b/src/transport/middle_proxy/send.rs index ba4a419..8bd21ee 100644 --- a/src/transport/middle_proxy/send.rs +++ b/src/transport/middle_proxy/send.rs @@ -1,13 +1,14 @@ use std::cmp::Reverse; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::net::SocketAddr; use std::sync::Arc; use std::sync::atomic::Ordering; -use std::time::Duration; +use std::time::{Duration, Instant}; use tokio::sync::mpsc::error::TrySendError; use tracing::{debug, warn}; +use crate::config::MeRouteNoWriterMode; use crate::error::{ProxyError, Result}; use crate::network::IpFamily; use crate::protocol::constants::RPC_CLOSE_EXT_U32; @@ -49,7 +50,11 @@ impl MePool { our_addr, proto_flags, }; - let mut emergency_attempts = 0; + let no_writer_mode = + MeRouteNoWriterMode::from_u8(self.me_route_no_writer_mode.load(Ordering::Relaxed)); + let mut no_writer_deadline: Option = None; + let mut emergency_attempts = 0u32; + let mut async_recovery_triggered = false; loop { if let Some(current) = self.registry.get_writer(conn_id).await { @@ -74,34 +79,66 @@ impl MePool { let mut writers_snapshot = { let ws = self.writers.read().await; if ws.is_empty() { - // Create waiter before recovery attempts so notify_one permits are not missed. - let waiter = self.writer_available.notified(); drop(ws); - for family in self.family_order() { - let map = match family { - IpFamily::V4 => self.proxy_map_v4.read().await.clone(), - IpFamily::V6 => self.proxy_map_v6.read().await.clone(), - }; - for (_dc, addrs) in map.iter() { - for (ip, port) in addrs { - let addr = SocketAddr::new(*ip, *port); - if self.connect_one(addr, self.rng.as_ref()).await.is_ok() { - self.writer_available.notify_one(); + match no_writer_mode { + MeRouteNoWriterMode::AsyncRecoveryFailfast => { + let deadline = *no_writer_deadline.get_or_insert_with(|| { + Instant::now() + self.me_route_no_writer_wait + }); + if !async_recovery_triggered { + let triggered = + self.trigger_async_recovery_for_target_dc(target_dc).await; + if !triggered { + self.trigger_async_recovery_global().await; + } + async_recovery_triggered = true; + } + if self.wait_for_writer_until(deadline).await { + continue; + } + self.stats.increment_me_no_writer_failfast_total(); + return Err(ProxyError::Proxy( + "No ME writer available in failfast window".into(), + )); + } + MeRouteNoWriterMode::InlineRecoveryLegacy => { + self.stats.increment_me_inline_recovery_total(); + for _ in 0..self.me_route_inline_recovery_attempts.max(1) { + for family in self.family_order() { + let map = match family { + IpFamily::V4 => self.proxy_map_v4.read().await.clone(), + IpFamily::V6 => self.proxy_map_v6.read().await.clone(), + }; + for (_dc, addrs) in &map { + for (ip, port) in addrs { + let addr = SocketAddr::new(*ip, *port); + let _ = self.connect_one(addr, self.rng.as_ref()).await; + } + } + } + if !self.writers.read().await.is_empty() { break; } } - } - } - if !self.writers.read().await.is_empty() { - continue; - } - if tokio::time::timeout(Duration::from_secs(3), waiter).await.is_err() { - if !self.writers.read().await.is_empty() { + if !self.writers.read().await.is_empty() { + continue; + } + let waiter = self.writer_available.notified(); + if tokio::time::timeout(self.me_route_inline_recovery_wait, waiter) + .await + .is_err() + { + if !self.writers.read().await.is_empty() { + continue; + } + self.stats.increment_me_no_writer_failfast_total(); + return Err(ProxyError::Proxy( + "All ME connections dead (legacy wait timeout)".into(), + )); + } continue; } - return Err(ProxyError::Proxy("All ME connections dead (waited 3s)".into())); } - continue; } ws.clone() }; @@ -115,46 +152,70 @@ impl MePool { .await; } if candidate_indices.is_empty() { - // Emergency connect-on-demand - if emergency_attempts >= 3 { - return Err(ProxyError::Proxy("No ME writers available for target DC".into())); - } - emergency_attempts += 1; - for family in self.family_order() { - let map_guard = match family { - IpFamily::V4 => self.proxy_map_v4.read().await, - IpFamily::V6 => self.proxy_map_v6.read().await, - }; - if let Some(addrs) = map_guard.get(&(target_dc as i32)) { - let mut shuffled = addrs.clone(); - shuffled.shuffle(&mut rand::rng()); - drop(map_guard); - for (ip, port) in shuffled { - let addr = SocketAddr::new(ip, port); - if self.connect_one(addr, self.rng.as_ref()).await.is_ok() { - break; + match no_writer_mode { + MeRouteNoWriterMode::AsyncRecoveryFailfast => { + let deadline = *no_writer_deadline.get_or_insert_with(|| { + Instant::now() + self.me_route_no_writer_wait + }); + if !async_recovery_triggered { + let triggered = self.trigger_async_recovery_for_target_dc(target_dc).await; + if !triggered { + self.trigger_async_recovery_global().await; + } + async_recovery_triggered = true; + } + if self.wait_for_candidate_until(target_dc, deadline).await { + continue; + } + self.stats.increment_me_no_writer_failfast_total(); + return Err(ProxyError::Proxy( + "No ME writers available for target DC in failfast window".into(), + )); + } + MeRouteNoWriterMode::InlineRecoveryLegacy => { + self.stats.increment_me_inline_recovery_total(); + if emergency_attempts >= self.me_route_inline_recovery_attempts.max(1) { + self.stats.increment_me_no_writer_failfast_total(); + return Err(ProxyError::Proxy("No ME writers available for target DC".into())); + } + emergency_attempts += 1; + for family in self.family_order() { + let map_guard = match family { + IpFamily::V4 => self.proxy_map_v4.read().await, + IpFamily::V6 => self.proxy_map_v6.read().await, + }; + if let Some(addrs) = map_guard.get(&(target_dc as i32)) { + let mut shuffled = addrs.clone(); + shuffled.shuffle(&mut rand::rng()); + drop(map_guard); + for (ip, port) in shuffled { + let addr = SocketAddr::new(ip, port); + if self.connect_one(addr, self.rng.as_ref()).await.is_ok() { + break; + } + } + tokio::time::sleep(Duration::from_millis(100 * emergency_attempts as u64)).await; + let ws2 = self.writers.read().await; + writers_snapshot = ws2.clone(); + drop(ws2); + candidate_indices = self + .candidate_indices_for_dc(&writers_snapshot, target_dc, false) + .await; + if candidate_indices.is_empty() { + candidate_indices = self + .candidate_indices_for_dc(&writers_snapshot, target_dc, true) + .await; + } + if !candidate_indices.is_empty() { + break; + } } } - tokio::time::sleep(Duration::from_millis(100 * emergency_attempts)).await; - let ws2 = self.writers.read().await; - writers_snapshot = ws2.clone(); - drop(ws2); - candidate_indices = self - .candidate_indices_for_dc(&writers_snapshot, target_dc, false) - .await; if candidate_indices.is_empty() { - candidate_indices = self - .candidate_indices_for_dc(&writers_snapshot, target_dc, true) - .await; - } - if !candidate_indices.is_empty() { - break; + return Err(ProxyError::Proxy("No ME writers available for target DC".into())); } } } - if candidate_indices.is_empty() { - return Err(ProxyError::Proxy("No ME writers available for target DC".into())); - } } let writer_idle_since = self.registry.writer_idle_since_snapshot().await; let now_epoch_secs = Self::now_epoch_secs(); @@ -275,6 +336,129 @@ impl MePool { } } + async fn wait_for_writer_until(&self, deadline: Instant) -> bool { + let waiter = self.writer_available.notified(); + if !self.writers.read().await.is_empty() { + return true; + } + let now = Instant::now(); + if now >= deadline { + return !self.writers.read().await.is_empty(); + } + let timeout = deadline.saturating_duration_since(now); + if tokio::time::timeout(timeout, waiter).await.is_ok() { + return true; + } + !self.writers.read().await.is_empty() + } + + async fn wait_for_candidate_until(&self, target_dc: i16, deadline: Instant) -> bool { + loop { + if self.has_candidate_for_target_dc(target_dc).await { + return true; + } + + let now = Instant::now(); + if now >= deadline { + return self.has_candidate_for_target_dc(target_dc).await; + } + + let remaining = deadline.saturating_duration_since(now); + let sleep_for = remaining.min(Duration::from_millis(25)); + let waiter = self.writer_available.notified(); + tokio::select! { + _ = waiter => {} + _ = tokio::time::sleep(sleep_for) => {} + } + } + } + + async fn has_candidate_for_target_dc(&self, target_dc: i16) -> bool { + let writers_snapshot = { + let ws = self.writers.read().await; + if ws.is_empty() { + return false; + } + ws.clone() + }; + let mut candidate_indices = self + .candidate_indices_for_dc(&writers_snapshot, target_dc, false) + .await; + if candidate_indices.is_empty() { + candidate_indices = self + .candidate_indices_for_dc(&writers_snapshot, target_dc, true) + .await; + } + !candidate_indices.is_empty() + } + + async fn trigger_async_recovery_for_target_dc(self: &Arc, target_dc: i16) -> bool { + let endpoints = self.endpoint_candidates_for_target_dc(target_dc).await; + if endpoints.is_empty() { + return false; + } + self.stats.increment_me_async_recovery_trigger_total(); + for addr in endpoints.into_iter().take(8) { + self.trigger_immediate_refill(addr); + } + true + } + + async fn trigger_async_recovery_global(self: &Arc) { + self.stats.increment_me_async_recovery_trigger_total(); + let mut seen = HashSet::::new(); + for family in self.family_order() { + let map = match family { + IpFamily::V4 => self.proxy_map_v4.read().await.clone(), + IpFamily::V6 => self.proxy_map_v6.read().await.clone(), + }; + for addrs in map.values() { + for (ip, port) in addrs { + let addr = SocketAddr::new(*ip, *port); + if seen.insert(addr) { + self.trigger_immediate_refill(addr); + } + if seen.len() >= 8 { + return; + } + } + } + } + } + + async fn endpoint_candidates_for_target_dc(&self, target_dc: i16) -> Vec { + let key = target_dc as i32; + let mut preferred = Vec::::new(); + let mut seen = HashSet::::new(); + + for family in self.family_order() { + let map = match family { + IpFamily::V4 => self.proxy_map_v4.read().await.clone(), + IpFamily::V6 => self.proxy_map_v6.read().await.clone(), + }; + let mut lookup_keys = vec![key, key.abs(), -key.abs()]; + let def = self.default_dc.load(Ordering::Relaxed); + if def != 0 { + lookup_keys.push(def); + } + for lookup in lookup_keys { + if let Some(addrs) = map.get(&lookup) { + for (ip, port) in addrs { + let addr = SocketAddr::new(*ip, *port); + if seen.insert(addr) { + preferred.push(addr); + } + } + } + } + if !preferred.is_empty() && !self.decision.effective_multipath { + break; + } + } + + preferred + } + pub async fn send_close(self: &Arc, conn_id: u64) -> Result<()> { if let Some(w) = self.registry.get_writer(conn_id).await { let mut p = Vec::with_capacity(12);