Compare commits

..

No commits in common. "cb5753f77c9d7cb12fc68dc069ca70dad111e596" and "7de822dd153aa5f197601e25e60c6ae562bea603" have entirely different histories.

10 changed files with 34 additions and 162 deletions

View File

@ -35,14 +35,11 @@ pub(super) struct RuntimeGatesData {
pub(super) conditional_cast_enabled: bool,
pub(super) me_runtime_ready: bool,
pub(super) me2dc_fallback_enabled: bool,
pub(super) me2dc_fast_enabled: bool,
pub(super) use_middle_proxy: bool,
pub(super) route_mode: &'static str,
pub(super) reroute_active: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) reroute_to_direct_at_epoch_secs: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) reroute_reason: Option<&'static str>,
pub(super) startup_status: &'static str,
pub(super) startup_stage: String,
pub(super) startup_progress_pct: f64,
@ -89,7 +86,6 @@ pub(super) struct EffectiveMiddleProxyLimits {
pub(super) writer_pick_mode: &'static str,
pub(super) writer_pick_sample_size: u8,
pub(super) me2dc_fallback: bool,
pub(super) me2dc_fast: bool,
}
#[derive(Serialize)]
@ -173,8 +169,6 @@ pub(super) async fn build_runtime_gates_data(
let startup_summary = build_runtime_startup_summary(shared).await;
let route_state = shared.route_runtime.snapshot();
let route_mode = route_state.mode.as_str();
let fast_fallback_enabled =
cfg.general.use_middle_proxy && cfg.general.me2dc_fallback && cfg.general.me2dc_fast;
let reroute_active = cfg.general.use_middle_proxy
&& cfg.general.me2dc_fallback
&& matches!(route_state.mode, RelayRouteMode::Direct);
@ -183,15 +177,6 @@ pub(super) async fn build_runtime_gates_data(
} else {
None
};
let reroute_reason = if reroute_active {
if fast_fallback_enabled {
Some("fast_not_ready_fallback")
} else {
Some("strict_grace_fallback")
}
} else {
None
};
let me_runtime_ready = if !cfg.general.use_middle_proxy {
true
} else {
@ -209,12 +194,10 @@ pub(super) async fn build_runtime_gates_data(
conditional_cast_enabled: cfg.general.use_middle_proxy,
me_runtime_ready,
me2dc_fallback_enabled: cfg.general.me2dc_fallback,
me2dc_fast_enabled: fast_fallback_enabled,
use_middle_proxy: cfg.general.use_middle_proxy,
route_mode,
reroute_active,
reroute_to_direct_at_epoch_secs,
reroute_reason,
startup_status: startup_summary.status,
startup_stage: startup_summary.stage,
startup_progress_pct: startup_summary.progress_pct,
@ -280,7 +263,6 @@ pub(super) fn build_limits_effective_data(cfg: &ProxyConfig) -> EffectiveLimitsD
writer_pick_mode: me_writer_pick_mode_label(cfg.general.me_writer_pick_mode),
writer_pick_sample_size: cfg.general.me_writer_pick_sample_size,
me2dc_fallback: cfg.general.me2dc_fallback,
me2dc_fast: cfg.general.me2dc_fast,
},
user_ip_policy: EffectiveUserIpPolicyLimits {
global_each: cfg.access.user_max_unique_ips_global_each,

View File

@ -273,10 +273,6 @@ pub(crate) fn default_me2dc_fallback() -> bool {
true
}
pub(crate) fn default_me2dc_fast() -> bool {
false
}
pub(crate) fn default_keepalive_interval() -> u64 {
8
}

View File

@ -672,11 +672,9 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b
warned = true;
warn!("config reload: general.me_init_retry_attempts changed; restart required");
}
if old.general.me2dc_fallback != new.general.me2dc_fallback
|| old.general.me2dc_fast != new.general.me2dc_fast
{
if old.general.me2dc_fallback != new.general.me2dc_fallback {
warned = true;
warn!("config reload: general.me2dc_fallback/me2dc_fast changed; restart required");
warn!("config reload: general.me2dc_fallback changed; restart required");
}
if old.general.proxy_config_v4_cache_path != new.general.proxy_config_v4_cache_path
|| old.general.proxy_config_v6_cache_path != new.general.proxy_config_v6_cache_path

View File

@ -1217,7 +1217,6 @@ mod tests {
default_me_init_retry_attempts()
);
assert_eq!(cfg.general.me2dc_fallback, default_me2dc_fallback());
assert_eq!(cfg.general.me2dc_fast, default_me2dc_fast());
assert_eq!(
cfg.general.proxy_config_v4_cache_path,
default_proxy_config_v4_cache_path()
@ -1357,7 +1356,6 @@ mod tests {
default_me_init_retry_attempts()
);
assert_eq!(general.me2dc_fallback, default_me2dc_fallback());
assert_eq!(general.me2dc_fast, default_me2dc_fast());
assert_eq!(
general.proxy_config_v4_cache_path,
default_proxy_config_v4_cache_path()

View File

@ -429,11 +429,6 @@ pub struct GeneralConfig {
#[serde(default = "default_me2dc_fallback")]
pub me2dc_fallback: bool,
/// Fast ME->Direct fallback mode for new sessions.
/// Active only when both `use_middle_proxy=true` and `me2dc_fallback=true`.
#[serde(default = "default_me2dc_fast")]
pub me2dc_fast: bool,
/// Enable ME keepalive padding frames.
#[serde(default = "default_true")]
pub me_keepalive_enabled: bool,
@ -944,7 +939,6 @@ impl Default for GeneralConfig {
middle_proxy_warm_standby: default_middle_proxy_warm_standby(),
me_init_retry_attempts: default_me_init_retry_attempts(),
me2dc_fallback: default_me2dc_fallback(),
me2dc_fast: default_me2dc_fast(),
me_keepalive_enabled: default_true(),
me_keepalive_interval_secs: default_keepalive_interval(),
me_keepalive_jitter_secs: default_keepalive_jitter(),

View File

@ -21,29 +21,10 @@ pub(crate) async fn configure_admission_gate(
if config.general.use_middle_proxy {
if let Some(pool) = me_pool.as_ref() {
let initial_ready = pool.admission_ready_conditional_cast().await;
let mut fallback_enabled = config.general.me2dc_fallback;
let mut fast_fallback_enabled = fallback_enabled && config.general.me2dc_fast;
let (initial_gate_open, initial_route_mode, initial_fallback_reason) = if initial_ready
{
(true, RelayRouteMode::Middle, None)
} else if fast_fallback_enabled {
(
true,
RelayRouteMode::Direct,
Some("fast_not_ready_fallback"),
)
} else {
(false, RelayRouteMode::Middle, None)
};
admission_tx.send_replace(initial_gate_open);
let _ = route_runtime.set_mode(initial_route_mode);
admission_tx.send_replace(initial_ready);
let _ = route_runtime.set_mode(RelayRouteMode::Middle);
if initial_ready {
info!("Conditional-admission gate: open / ME pool READY");
} else if let Some(reason) = initial_fallback_reason {
warn!(
fallback_reason = reason,
"Conditional-admission gate opened in ME fast fallback mode"
);
} else {
warn!("Conditional-admission gate: closed / ME pool is NOT ready)");
}
@ -53,9 +34,10 @@ pub(crate) async fn configure_admission_gate(
let route_runtime_gate = route_runtime.clone();
let mut config_rx_gate = config_rx.clone();
let mut admission_poll_ms = config.general.me_admission_poll_ms.max(1);
let mut fallback_enabled = config.general.me2dc_fallback;
tokio::spawn(async move {
let mut gate_open = initial_gate_open;
let mut route_mode = initial_route_mode;
let mut gate_open = initial_ready;
let mut route_mode = RelayRouteMode::Middle;
let mut ready_observed = initial_ready;
let mut not_ready_since = if initial_ready {
None
@ -71,23 +53,16 @@ pub(crate) async fn configure_admission_gate(
let cfg = config_rx_gate.borrow_and_update().clone();
admission_poll_ms = cfg.general.me_admission_poll_ms.max(1);
fallback_enabled = cfg.general.me2dc_fallback;
fast_fallback_enabled = cfg.general.me2dc_fallback && cfg.general.me2dc_fast;
continue;
}
_ = tokio::time::sleep(Duration::from_millis(admission_poll_ms)) => {}
}
let ready = pool_for_gate.admission_ready_conditional_cast().await;
let now = Instant::now();
let (next_gate_open, next_route_mode, next_fallback_reason) = if ready {
let (next_gate_open, next_route_mode, next_fallback_active) = if ready {
ready_observed = true;
not_ready_since = None;
(true, RelayRouteMode::Middle, None)
} else if fast_fallback_enabled {
(
true,
RelayRouteMode::Direct,
Some("fast_not_ready_fallback"),
)
(true, RelayRouteMode::Middle, false)
} else {
let not_ready_started_at = *not_ready_since.get_or_insert(now);
let not_ready_for = now.saturating_duration_since(not_ready_started_at);
@ -97,12 +72,11 @@ pub(crate) async fn configure_admission_gate(
STARTUP_FALLBACK_AFTER
};
if fallback_enabled && not_ready_for > fallback_after {
(true, RelayRouteMode::Direct, Some("strict_grace_fallback"))
(true, RelayRouteMode::Direct, true)
} else {
(false, RelayRouteMode::Middle, None)
(false, RelayRouteMode::Middle, false)
}
};
let next_fallback_active = next_fallback_reason.is_some();
if next_route_mode != route_mode {
route_mode = next_route_mode;
@ -114,28 +88,17 @@ pub(crate) async fn configure_admission_gate(
"Middle-End routing restored for new sessions"
);
} else {
let fallback_reason = next_fallback_reason.unwrap_or("unknown");
if fallback_reason == "strict_grace_fallback" {
let fallback_after = if ready_observed {
RUNTIME_FALLBACK_AFTER
} else {
STARTUP_FALLBACK_AFTER
};
warn!(
target_mode = route_mode.as_str(),
cutover_generation = snapshot.generation,
grace_secs = fallback_after.as_secs(),
fallback_reason,
"ME pool stayed not-ready beyond grace; routing new sessions via Direct-DC"
);
let fallback_after = if ready_observed {
RUNTIME_FALLBACK_AFTER
} else {
warn!(
target_mode = route_mode.as_str(),
cutover_generation = snapshot.generation,
fallback_reason,
"ME pool not-ready; routing new sessions via Direct-DC (fast mode)"
);
}
STARTUP_FALLBACK_AFTER
};
warn!(
target_mode = route_mode.as_str(),
cutover_generation = snapshot.generation,
grace_secs = fallback_after.as_secs(),
"ME pool stayed not-ready beyond grace; routing new sessions via Direct-DC"
);
}
}
}
@ -145,10 +108,7 @@ pub(crate) async fn configure_admission_gate(
admission_tx_gate.send_replace(gate_open);
if gate_open {
if next_fallback_active {
warn!(
fallback_reason = next_fallback_reason.unwrap_or("unknown"),
"Conditional-admission gate opened in ME fallback mode"
);
warn!("Conditional-admission gate opened in ME fallback mode");
} else {
info!("Conditional-admission gate opened / ME pool READY");
}

View File

@ -45,6 +45,10 @@ const RELAY_IDLE_IO_POLL_MAX: Duration = Duration::from_secs(1);
const TINY_FRAME_DEBT_PER_TINY: u32 = 8;
const TINY_FRAME_DEBT_LIMIT: u32 = 512;
#[cfg(test)]
const C2ME_SEND_TIMEOUT: Duration = Duration::from_millis(50);
#[cfg(not(test))]
const C2ME_SEND_TIMEOUT: Duration = Duration::from_secs(5);
#[cfg(test)]
const RELAY_TEST_STEP_TIMEOUT: Duration = Duration::from_secs(1);
const ME_D2C_FLUSH_BATCH_MAX_FRAMES_MIN: usize = 1;
const ME_D2C_FLUSH_BATCH_MAX_BYTES_MIN: usize = 4096;
@ -644,7 +648,6 @@ pub(crate) fn relay_idle_pressure_test_scope() -> std::sync::MutexGuard<'static,
async fn enqueue_c2me_command(
tx: &mpsc::Sender<C2MeCommand>,
cmd: C2MeCommand,
send_timeout: Option<Duration>,
) -> std::result::Result<(), mpsc::error::SendError<C2MeCommand>> {
match tx.try_send(cmd) {
Ok(()) => Ok(()),
@ -655,18 +658,12 @@ async fn enqueue_c2me_command(
if tx.capacity() <= C2ME_SOFT_PRESSURE_MIN_FREE_SLOTS {
tokio::task::yield_now().await;
}
let reserve_result = match send_timeout {
Some(send_timeout) => match timeout(send_timeout, tx.reserve()).await {
Ok(result) => result,
Err(_) => return Err(mpsc::error::SendError(cmd)),
},
None => tx.reserve().await,
};
match reserve_result {
Ok(permit) => {
match timeout(C2ME_SEND_TIMEOUT, tx.reserve()).await {
Ok(Ok(permit)) => {
permit.send(cmd);
Ok(())
}
Ok(Err(_)) => Err(mpsc::error::SendError(cmd)),
Err(_) => Err(mpsc::error::SendError(cmd)),
}
}
@ -792,10 +789,6 @@ where
.general
.me_c2me_channel_capacity
.max(C2ME_CHANNEL_CAPACITY_FALLBACK);
let c2me_send_timeout = match config.general.me_c2me_send_timeout_ms {
0 => None,
timeout_ms => Some(Duration::from_millis(timeout_ms)),
};
let (c2me_tx, mut c2me_rx) = mpsc::channel::<C2MeCommand>(c2me_channel_capacity);
let me_pool_c2me = me_pool.clone();
let c2me_sender = tokio::spawn(async move {
@ -1172,7 +1165,7 @@ where
user = %user,
"Middle-relay pressure eviction for idle-candidate session"
);
let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close, c2me_send_timeout).await;
let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close).await;
main_result = Err(ProxyError::Proxy(
"middle-relay session evicted under pressure (idle-candidate)".to_string(),
));
@ -1191,7 +1184,7 @@ where
"Cutover affected middle session, closing client connection"
);
tokio::time::sleep(delay).await;
let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close, c2me_send_timeout).await;
let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close).await;
main_result = Err(ProxyError::Proxy(ROUTE_SWITCH_ERROR_MSG.to_string()));
break;
}
@ -1249,12 +1242,8 @@ where
flags |= RPC_FLAG_NOT_ENCRYPTED;
}
// Keep client read loop lightweight: route heavy ME send path via a dedicated task.
if enqueue_c2me_command(
&c2me_tx,
C2MeCommand::Data { payload, flags },
c2me_send_timeout,
)
.await
if enqueue_c2me_command(&c2me_tx, C2MeCommand::Data { payload, flags })
.await
.is_err()
{
main_result = Err(ProxyError::Proxy("ME sender channel closed".into()));
@ -1264,9 +1253,7 @@ where
Ok(None) => {
debug!(conn_id, "Client EOF");
client_closed = true;
let _ =
enqueue_c2me_command(&c2me_tx, C2MeCommand::Close, c2me_send_timeout)
.await;
let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close).await;
break;
}
Err(e) => {

View File

@ -126,7 +126,6 @@ async fn c2me_channel_full_path_yields_then_sends() {
payload: make_pooled_payload(&[0xBB, 0xCC]),
flags: 2,
},
None,
)
.await
});

View File

@ -13,7 +13,6 @@ use super::pool::{MePool, RefillDcKey, RefillEndpointKey, WriterContour};
const ME_FLAP_UPTIME_THRESHOLD_SECS: u64 = 20;
const ME_FLAP_QUARANTINE_SECS: u64 = 25;
const ME_FLAP_MIN_UPTIME_MILLIS: u64 = 500;
const ME_REFILL_TOTAL_ATTEMPT_CAP: u32 = 20;
impl MePool {
@ -36,17 +35,6 @@ impl MePool {
uptime: Duration,
reason: &'static str,
) {
if uptime < Duration::from_millis(ME_FLAP_MIN_UPTIME_MILLIS) {
debug!(
%addr,
reason,
uptime_ms = uptime.as_millis(),
min_uptime_ms = ME_FLAP_MIN_UPTIME_MILLIS,
"Skipping flap quarantine for ultra-short writer lifetime"
);
return;
}
if uptime > Duration::from_secs(ME_FLAP_UPTIME_THRESHOLD_SECS) {
return;
}

View File

@ -309,36 +309,6 @@ async fn adversarial_blackhat_single_unexpected_remove_establishes_single_quaran
);
}
#[tokio::test]
async fn remove_ultra_short_uptime_writer_skips_flap_quarantine() {
let pool = make_pool().await;
let writer_id = 931;
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 12, 0, 131)), 443);
let before_total = pool.stats.get_me_endpoint_quarantine_total();
let before_unexpected = pool.stats.get_me_endpoint_quarantine_unexpected_total();
insert_writer(
&pool,
writer_id,
2,
addr,
false,
Instant::now() - Duration::from_millis(50),
)
.await;
pool.remove_writer_and_close_clients(writer_id).await;
assert!(
!pool.is_endpoint_quarantined(addr).await,
"ultra-short unexpected lifetime must not quarantine endpoint"
);
assert_eq!(pool.stats.get_me_endpoint_quarantine_total(), before_total);
assert_eq!(
pool.stats.get_me_endpoint_quarantine_unexpected_total(),
before_unexpected + 1
);
}
#[tokio::test]
async fn integration_old_uptime_writer_does_not_trigger_flap_quarantine() {
let pool = make_pool().await;