diff --git a/src/api/runtime_zero.rs b/src/api/runtime_zero.rs index a6eb163..0ed84a8 100644 --- a/src/api/runtime_zero.rs +++ b/src/api/runtime_zero.rs @@ -35,11 +35,14 @@ pub(super) struct RuntimeGatesData { pub(super) conditional_cast_enabled: bool, pub(super) me_runtime_ready: bool, pub(super) me2dc_fallback_enabled: bool, + pub(super) me2dc_fast_enabled: bool, pub(super) use_middle_proxy: bool, pub(super) route_mode: &'static str, pub(super) reroute_active: bool, #[serde(skip_serializing_if = "Option::is_none")] pub(super) reroute_to_direct_at_epoch_secs: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) reroute_reason: Option<&'static str>, pub(super) startup_status: &'static str, pub(super) startup_stage: String, pub(super) startup_progress_pct: f64, @@ -86,6 +89,7 @@ pub(super) struct EffectiveMiddleProxyLimits { pub(super) writer_pick_mode: &'static str, pub(super) writer_pick_sample_size: u8, pub(super) me2dc_fallback: bool, + pub(super) me2dc_fast: bool, } #[derive(Serialize)] @@ -169,6 +173,8 @@ pub(super) async fn build_runtime_gates_data( let startup_summary = build_runtime_startup_summary(shared).await; let route_state = shared.route_runtime.snapshot(); let route_mode = route_state.mode.as_str(); + let fast_fallback_enabled = + cfg.general.use_middle_proxy && cfg.general.me2dc_fallback && cfg.general.me2dc_fast; let reroute_active = cfg.general.use_middle_proxy && cfg.general.me2dc_fallback && matches!(route_state.mode, RelayRouteMode::Direct); @@ -177,6 +183,15 @@ pub(super) async fn build_runtime_gates_data( } else { None }; + let reroute_reason = if reroute_active { + if fast_fallback_enabled { + Some("fast_not_ready_fallback") + } else { + Some("strict_grace_fallback") + } + } else { + None + }; let me_runtime_ready = if !cfg.general.use_middle_proxy { true } else { @@ -194,10 +209,12 @@ pub(super) async fn build_runtime_gates_data( conditional_cast_enabled: cfg.general.use_middle_proxy, me_runtime_ready, me2dc_fallback_enabled: cfg.general.me2dc_fallback, + me2dc_fast_enabled: fast_fallback_enabled, use_middle_proxy: cfg.general.use_middle_proxy, route_mode, reroute_active, reroute_to_direct_at_epoch_secs, + reroute_reason, startup_status: startup_summary.status, startup_stage: startup_summary.stage, startup_progress_pct: startup_summary.progress_pct, @@ -263,6 +280,7 @@ pub(super) fn build_limits_effective_data(cfg: &ProxyConfig) -> EffectiveLimitsD writer_pick_mode: me_writer_pick_mode_label(cfg.general.me_writer_pick_mode), writer_pick_sample_size: cfg.general.me_writer_pick_sample_size, me2dc_fallback: cfg.general.me2dc_fallback, + me2dc_fast: cfg.general.me2dc_fast, }, user_ip_policy: EffectiveUserIpPolicyLimits { global_each: cfg.access.user_max_unique_ips_global_each, diff --git a/src/config/defaults.rs b/src/config/defaults.rs index b0aaf5b..608e1b8 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -273,6 +273,10 @@ pub(crate) fn default_me2dc_fallback() -> bool { true } +pub(crate) fn default_me2dc_fast() -> bool { + false +} + pub(crate) fn default_keepalive_interval() -> u64 { 8 } diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index f8064dd..9bd2927 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -672,9 +672,11 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b warned = true; warn!("config reload: general.me_init_retry_attempts changed; restart required"); } - if old.general.me2dc_fallback != new.general.me2dc_fallback { + if old.general.me2dc_fallback != new.general.me2dc_fallback + || old.general.me2dc_fast != new.general.me2dc_fast + { warned = true; - warn!("config reload: general.me2dc_fallback changed; restart required"); + warn!("config reload: general.me2dc_fallback/me2dc_fast changed; restart required"); } if old.general.proxy_config_v4_cache_path != new.general.proxy_config_v4_cache_path || old.general.proxy_config_v6_cache_path != new.general.proxy_config_v6_cache_path diff --git a/src/config/load.rs b/src/config/load.rs index 3cb6627..7892e2c 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -1217,6 +1217,7 @@ mod tests { default_me_init_retry_attempts() ); assert_eq!(cfg.general.me2dc_fallback, default_me2dc_fallback()); + assert_eq!(cfg.general.me2dc_fast, default_me2dc_fast()); assert_eq!( cfg.general.proxy_config_v4_cache_path, default_proxy_config_v4_cache_path() @@ -1356,6 +1357,7 @@ mod tests { default_me_init_retry_attempts() ); assert_eq!(general.me2dc_fallback, default_me2dc_fallback()); + assert_eq!(general.me2dc_fast, default_me2dc_fast()); assert_eq!( general.proxy_config_v4_cache_path, default_proxy_config_v4_cache_path() diff --git a/src/config/types.rs b/src/config/types.rs index 3939664..cb14747 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -429,6 +429,11 @@ pub struct GeneralConfig { #[serde(default = "default_me2dc_fallback")] pub me2dc_fallback: bool, + /// Fast ME->Direct fallback mode for new sessions. + /// Active only when both `use_middle_proxy=true` and `me2dc_fallback=true`. + #[serde(default = "default_me2dc_fast")] + pub me2dc_fast: bool, + /// Enable ME keepalive padding frames. #[serde(default = "default_true")] pub me_keepalive_enabled: bool, @@ -939,6 +944,7 @@ impl Default for GeneralConfig { middle_proxy_warm_standby: default_middle_proxy_warm_standby(), me_init_retry_attempts: default_me_init_retry_attempts(), me2dc_fallback: default_me2dc_fallback(), + me2dc_fast: default_me2dc_fast(), me_keepalive_enabled: default_true(), me_keepalive_interval_secs: default_keepalive_interval(), me_keepalive_jitter_secs: default_keepalive_jitter(), diff --git a/src/maestro/admission.rs b/src/maestro/admission.rs index 69a9c9f..c781078 100644 --- a/src/maestro/admission.rs +++ b/src/maestro/admission.rs @@ -21,10 +21,24 @@ pub(crate) async fn configure_admission_gate( if config.general.use_middle_proxy { if let Some(pool) = me_pool.as_ref() { let initial_ready = pool.admission_ready_conditional_cast().await; - admission_tx.send_replace(initial_ready); - let _ = route_runtime.set_mode(RelayRouteMode::Middle); + let mut fallback_enabled = config.general.me2dc_fallback; + let mut fast_fallback_enabled = fallback_enabled && config.general.me2dc_fast; + let (initial_gate_open, initial_route_mode, initial_fallback_reason) = if initial_ready { + (true, RelayRouteMode::Middle, None) + } else if fast_fallback_enabled { + (true, RelayRouteMode::Direct, Some("fast_not_ready_fallback")) + } else { + (false, RelayRouteMode::Middle, None) + }; + admission_tx.send_replace(initial_gate_open); + let _ = route_runtime.set_mode(initial_route_mode); if initial_ready { info!("Conditional-admission gate: open / ME pool READY"); + } else if let Some(reason) = initial_fallback_reason { + warn!( + fallback_reason = reason, + "Conditional-admission gate opened in ME fast fallback mode" + ); } else { warn!("Conditional-admission gate: closed / ME pool is NOT ready)"); } @@ -34,10 +48,9 @@ pub(crate) async fn configure_admission_gate( let route_runtime_gate = route_runtime.clone(); let mut config_rx_gate = config_rx.clone(); let mut admission_poll_ms = config.general.me_admission_poll_ms.max(1); - let mut fallback_enabled = config.general.me2dc_fallback; tokio::spawn(async move { - let mut gate_open = initial_ready; - let mut route_mode = RelayRouteMode::Middle; + let mut gate_open = initial_gate_open; + let mut route_mode = initial_route_mode; let mut ready_observed = initial_ready; let mut not_ready_since = if initial_ready { None @@ -53,16 +66,19 @@ pub(crate) async fn configure_admission_gate( let cfg = config_rx_gate.borrow_and_update().clone(); admission_poll_ms = cfg.general.me_admission_poll_ms.max(1); fallback_enabled = cfg.general.me2dc_fallback; + fast_fallback_enabled = cfg.general.me2dc_fallback && cfg.general.me2dc_fast; continue; } _ = tokio::time::sleep(Duration::from_millis(admission_poll_ms)) => {} } let ready = pool_for_gate.admission_ready_conditional_cast().await; let now = Instant::now(); - let (next_gate_open, next_route_mode, next_fallback_active) = if ready { + let (next_gate_open, next_route_mode, next_fallback_reason) = if ready { ready_observed = true; not_ready_since = None; - (true, RelayRouteMode::Middle, false) + (true, RelayRouteMode::Middle, None) + } else if fast_fallback_enabled { + (true, RelayRouteMode::Direct, Some("fast_not_ready_fallback")) } else { let not_ready_started_at = *not_ready_since.get_or_insert(now); let not_ready_for = now.saturating_duration_since(not_ready_started_at); @@ -72,11 +88,12 @@ pub(crate) async fn configure_admission_gate( STARTUP_FALLBACK_AFTER }; if fallback_enabled && not_ready_for > fallback_after { - (true, RelayRouteMode::Direct, true) + (true, RelayRouteMode::Direct, Some("strict_grace_fallback")) } else { - (false, RelayRouteMode::Middle, false) + (false, RelayRouteMode::Middle, None) } }; + let next_fallback_active = next_fallback_reason.is_some(); if next_route_mode != route_mode { route_mode = next_route_mode; @@ -88,17 +105,28 @@ pub(crate) async fn configure_admission_gate( "Middle-End routing restored for new sessions" ); } else { - let fallback_after = if ready_observed { - RUNTIME_FALLBACK_AFTER + let fallback_reason = next_fallback_reason.unwrap_or("unknown"); + if fallback_reason == "strict_grace_fallback" { + let fallback_after = if ready_observed { + RUNTIME_FALLBACK_AFTER + } else { + STARTUP_FALLBACK_AFTER + }; + warn!( + target_mode = route_mode.as_str(), + cutover_generation = snapshot.generation, + grace_secs = fallback_after.as_secs(), + fallback_reason, + "ME pool stayed not-ready beyond grace; routing new sessions via Direct-DC" + ); } else { - STARTUP_FALLBACK_AFTER - }; - warn!( - target_mode = route_mode.as_str(), - cutover_generation = snapshot.generation, - grace_secs = fallback_after.as_secs(), - "ME pool stayed not-ready beyond grace; routing new sessions via Direct-DC" - ); + warn!( + target_mode = route_mode.as_str(), + cutover_generation = snapshot.generation, + fallback_reason, + "ME pool not-ready; routing new sessions via Direct-DC (fast mode)" + ); + } } } } @@ -108,7 +136,10 @@ pub(crate) async fn configure_admission_gate( admission_tx_gate.send_replace(gate_open); if gate_open { if next_fallback_active { - warn!("Conditional-admission gate opened in ME fallback mode"); + warn!( + fallback_reason = next_fallback_reason.unwrap_or("unknown"), + "Conditional-admission gate opened in ME fallback mode" + ); } else { info!("Conditional-admission gate opened / ME pool READY"); } diff --git a/src/proxy/middle_relay.rs b/src/proxy/middle_relay.rs index e8fc52a..d2f37a6 100644 --- a/src/proxy/middle_relay.rs +++ b/src/proxy/middle_relay.rs @@ -45,10 +45,6 @@ const RELAY_IDLE_IO_POLL_MAX: Duration = Duration::from_secs(1); const TINY_FRAME_DEBT_PER_TINY: u32 = 8; const TINY_FRAME_DEBT_LIMIT: u32 = 512; #[cfg(test)] -const C2ME_SEND_TIMEOUT: Duration = Duration::from_millis(50); -#[cfg(not(test))] -const C2ME_SEND_TIMEOUT: Duration = Duration::from_secs(5); -#[cfg(test)] const RELAY_TEST_STEP_TIMEOUT: Duration = Duration::from_secs(1); const ME_D2C_FLUSH_BATCH_MAX_FRAMES_MIN: usize = 1; const ME_D2C_FLUSH_BATCH_MAX_BYTES_MIN: usize = 4096; @@ -648,6 +644,7 @@ pub(crate) fn relay_idle_pressure_test_scope() -> std::sync::MutexGuard<'static, async fn enqueue_c2me_command( tx: &mpsc::Sender, cmd: C2MeCommand, + send_timeout: Option, ) -> std::result::Result<(), mpsc::error::SendError> { match tx.try_send(cmd) { Ok(()) => Ok(()), @@ -658,12 +655,18 @@ async fn enqueue_c2me_command( if tx.capacity() <= C2ME_SOFT_PRESSURE_MIN_FREE_SLOTS { tokio::task::yield_now().await; } - match timeout(C2ME_SEND_TIMEOUT, tx.reserve()).await { - Ok(Ok(permit)) => { + let reserve_result = match send_timeout { + Some(send_timeout) => match timeout(send_timeout, tx.reserve()).await { + Ok(result) => result, + Err(_) => return Err(mpsc::error::SendError(cmd)), + }, + None => tx.reserve().await, + }; + match reserve_result { + Ok(permit) => { permit.send(cmd); Ok(()) } - Ok(Err(_)) => Err(mpsc::error::SendError(cmd)), Err(_) => Err(mpsc::error::SendError(cmd)), } } @@ -789,6 +792,10 @@ where .general .me_c2me_channel_capacity .max(C2ME_CHANNEL_CAPACITY_FALLBACK); + let c2me_send_timeout = match config.general.me_c2me_send_timeout_ms { + 0 => None, + timeout_ms => Some(Duration::from_millis(timeout_ms)), + }; let (c2me_tx, mut c2me_rx) = mpsc::channel::(c2me_channel_capacity); let me_pool_c2me = me_pool.clone(); let c2me_sender = tokio::spawn(async move { @@ -1165,7 +1172,7 @@ where user = %user, "Middle-relay pressure eviction for idle-candidate session" ); - let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close).await; + let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close, c2me_send_timeout).await; main_result = Err(ProxyError::Proxy( "middle-relay session evicted under pressure (idle-candidate)".to_string(), )); @@ -1184,7 +1191,7 @@ where "Cutover affected middle session, closing client connection" ); tokio::time::sleep(delay).await; - let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close).await; + let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close, c2me_send_timeout).await; main_result = Err(ProxyError::Proxy(ROUTE_SWITCH_ERROR_MSG.to_string())); break; } @@ -1242,8 +1249,12 @@ where flags |= RPC_FLAG_NOT_ENCRYPTED; } // Keep client read loop lightweight: route heavy ME send path via a dedicated task. - if enqueue_c2me_command(&c2me_tx, C2MeCommand::Data { payload, flags }) - .await + if enqueue_c2me_command( + &c2me_tx, + C2MeCommand::Data { payload, flags }, + c2me_send_timeout, + ) + .await .is_err() { main_result = Err(ProxyError::Proxy("ME sender channel closed".into())); @@ -1253,7 +1264,9 @@ where Ok(None) => { debug!(conn_id, "Client EOF"); client_closed = true; - let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close).await; + let _ = + enqueue_c2me_command(&c2me_tx, C2MeCommand::Close, c2me_send_timeout) + .await; break; } Err(e) => { diff --git a/src/proxy/tests/middle_relay_stub_completion_security_tests.rs b/src/proxy/tests/middle_relay_stub_completion_security_tests.rs index 2635a28..fbb9081 100644 --- a/src/proxy/tests/middle_relay_stub_completion_security_tests.rs +++ b/src/proxy/tests/middle_relay_stub_completion_security_tests.rs @@ -126,6 +126,7 @@ async fn c2me_channel_full_path_yields_then_sends() { payload: make_pooled_payload(&[0xBB, 0xCC]), flags: 2, }, + None, ) .await }); diff --git a/src/transport/middle_proxy/pool_refill.rs b/src/transport/middle_proxy/pool_refill.rs index 06e450b..69d8aa0 100644 --- a/src/transport/middle_proxy/pool_refill.rs +++ b/src/transport/middle_proxy/pool_refill.rs @@ -13,6 +13,7 @@ use super::pool::{MePool, RefillDcKey, RefillEndpointKey, WriterContour}; const ME_FLAP_UPTIME_THRESHOLD_SECS: u64 = 20; const ME_FLAP_QUARANTINE_SECS: u64 = 25; +const ME_FLAP_MIN_UPTIME_MILLIS: u64 = 500; const ME_REFILL_TOTAL_ATTEMPT_CAP: u32 = 20; impl MePool { @@ -35,6 +36,17 @@ impl MePool { uptime: Duration, reason: &'static str, ) { + if uptime < Duration::from_millis(ME_FLAP_MIN_UPTIME_MILLIS) { + debug!( + %addr, + reason, + uptime_ms = uptime.as_millis(), + min_uptime_ms = ME_FLAP_MIN_UPTIME_MILLIS, + "Skipping flap quarantine for ultra-short writer lifetime" + ); + return; + } + if uptime > Duration::from_secs(ME_FLAP_UPTIME_THRESHOLD_SECS) { return; } diff --git a/src/transport/middle_proxy/tests/pool_writer_security_tests.rs b/src/transport/middle_proxy/tests/pool_writer_security_tests.rs index e287624..0184e11 100644 --- a/src/transport/middle_proxy/tests/pool_writer_security_tests.rs +++ b/src/transport/middle_proxy/tests/pool_writer_security_tests.rs @@ -309,6 +309,36 @@ async fn adversarial_blackhat_single_unexpected_remove_establishes_single_quaran ); } +#[tokio::test] +async fn remove_ultra_short_uptime_writer_skips_flap_quarantine() { + let pool = make_pool().await; + let writer_id = 931; + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 12, 0, 131)), 443); + let before_total = pool.stats.get_me_endpoint_quarantine_total(); + let before_unexpected = pool.stats.get_me_endpoint_quarantine_unexpected_total(); + insert_writer( + &pool, + writer_id, + 2, + addr, + false, + Instant::now() - Duration::from_millis(50), + ) + .await; + + pool.remove_writer_and_close_clients(writer_id).await; + + assert!( + !pool.is_endpoint_quarantined(addr).await, + "ultra-short unexpected lifetime must not quarantine endpoint" + ); + assert_eq!(pool.stats.get_me_endpoint_quarantine_total(), before_total); + assert_eq!( + pool.stats.get_me_endpoint_quarantine_unexpected_total(), + before_unexpected + 1 + ); +} + #[tokio::test] async fn integration_old_uptime_writer_does_not_trigger_flap_quarantine() { let pool = make_pool().await;