mirror of https://github.com/telemt/telemt.git
Middle Relay fixes
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
parent
7de822dd15
commit
7a075b2ffe
|
|
@ -35,11 +35,14 @@ pub(super) struct RuntimeGatesData {
|
||||||
pub(super) conditional_cast_enabled: bool,
|
pub(super) conditional_cast_enabled: bool,
|
||||||
pub(super) me_runtime_ready: bool,
|
pub(super) me_runtime_ready: bool,
|
||||||
pub(super) me2dc_fallback_enabled: bool,
|
pub(super) me2dc_fallback_enabled: bool,
|
||||||
|
pub(super) me2dc_fast_enabled: bool,
|
||||||
pub(super) use_middle_proxy: bool,
|
pub(super) use_middle_proxy: bool,
|
||||||
pub(super) route_mode: &'static str,
|
pub(super) route_mode: &'static str,
|
||||||
pub(super) reroute_active: bool,
|
pub(super) reroute_active: bool,
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
pub(super) reroute_to_direct_at_epoch_secs: Option<u64>,
|
pub(super) reroute_to_direct_at_epoch_secs: Option<u64>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub(super) reroute_reason: Option<&'static str>,
|
||||||
pub(super) startup_status: &'static str,
|
pub(super) startup_status: &'static str,
|
||||||
pub(super) startup_stage: String,
|
pub(super) startup_stage: String,
|
||||||
pub(super) startup_progress_pct: f64,
|
pub(super) startup_progress_pct: f64,
|
||||||
|
|
@ -86,6 +89,7 @@ pub(super) struct EffectiveMiddleProxyLimits {
|
||||||
pub(super) writer_pick_mode: &'static str,
|
pub(super) writer_pick_mode: &'static str,
|
||||||
pub(super) writer_pick_sample_size: u8,
|
pub(super) writer_pick_sample_size: u8,
|
||||||
pub(super) me2dc_fallback: bool,
|
pub(super) me2dc_fallback: bool,
|
||||||
|
pub(super) me2dc_fast: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize)]
|
#[derive(Serialize)]
|
||||||
|
|
@ -169,6 +173,8 @@ pub(super) async fn build_runtime_gates_data(
|
||||||
let startup_summary = build_runtime_startup_summary(shared).await;
|
let startup_summary = build_runtime_startup_summary(shared).await;
|
||||||
let route_state = shared.route_runtime.snapshot();
|
let route_state = shared.route_runtime.snapshot();
|
||||||
let route_mode = route_state.mode.as_str();
|
let route_mode = route_state.mode.as_str();
|
||||||
|
let fast_fallback_enabled =
|
||||||
|
cfg.general.use_middle_proxy && cfg.general.me2dc_fallback && cfg.general.me2dc_fast;
|
||||||
let reroute_active = cfg.general.use_middle_proxy
|
let reroute_active = cfg.general.use_middle_proxy
|
||||||
&& cfg.general.me2dc_fallback
|
&& cfg.general.me2dc_fallback
|
||||||
&& matches!(route_state.mode, RelayRouteMode::Direct);
|
&& matches!(route_state.mode, RelayRouteMode::Direct);
|
||||||
|
|
@ -177,6 +183,15 @@ pub(super) async fn build_runtime_gates_data(
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
};
|
};
|
||||||
|
let reroute_reason = if reroute_active {
|
||||||
|
if fast_fallback_enabled {
|
||||||
|
Some("fast_not_ready_fallback")
|
||||||
|
} else {
|
||||||
|
Some("strict_grace_fallback")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
let me_runtime_ready = if !cfg.general.use_middle_proxy {
|
let me_runtime_ready = if !cfg.general.use_middle_proxy {
|
||||||
true
|
true
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -194,10 +209,12 @@ pub(super) async fn build_runtime_gates_data(
|
||||||
conditional_cast_enabled: cfg.general.use_middle_proxy,
|
conditional_cast_enabled: cfg.general.use_middle_proxy,
|
||||||
me_runtime_ready,
|
me_runtime_ready,
|
||||||
me2dc_fallback_enabled: cfg.general.me2dc_fallback,
|
me2dc_fallback_enabled: cfg.general.me2dc_fallback,
|
||||||
|
me2dc_fast_enabled: fast_fallback_enabled,
|
||||||
use_middle_proxy: cfg.general.use_middle_proxy,
|
use_middle_proxy: cfg.general.use_middle_proxy,
|
||||||
route_mode,
|
route_mode,
|
||||||
reroute_active,
|
reroute_active,
|
||||||
reroute_to_direct_at_epoch_secs,
|
reroute_to_direct_at_epoch_secs,
|
||||||
|
reroute_reason,
|
||||||
startup_status: startup_summary.status,
|
startup_status: startup_summary.status,
|
||||||
startup_stage: startup_summary.stage,
|
startup_stage: startup_summary.stage,
|
||||||
startup_progress_pct: startup_summary.progress_pct,
|
startup_progress_pct: startup_summary.progress_pct,
|
||||||
|
|
@ -263,6 +280,7 @@ pub(super) fn build_limits_effective_data(cfg: &ProxyConfig) -> EffectiveLimitsD
|
||||||
writer_pick_mode: me_writer_pick_mode_label(cfg.general.me_writer_pick_mode),
|
writer_pick_mode: me_writer_pick_mode_label(cfg.general.me_writer_pick_mode),
|
||||||
writer_pick_sample_size: cfg.general.me_writer_pick_sample_size,
|
writer_pick_sample_size: cfg.general.me_writer_pick_sample_size,
|
||||||
me2dc_fallback: cfg.general.me2dc_fallback,
|
me2dc_fallback: cfg.general.me2dc_fallback,
|
||||||
|
me2dc_fast: cfg.general.me2dc_fast,
|
||||||
},
|
},
|
||||||
user_ip_policy: EffectiveUserIpPolicyLimits {
|
user_ip_policy: EffectiveUserIpPolicyLimits {
|
||||||
global_each: cfg.access.user_max_unique_ips_global_each,
|
global_each: cfg.access.user_max_unique_ips_global_each,
|
||||||
|
|
|
||||||
|
|
@ -273,6 +273,10 @@ pub(crate) fn default_me2dc_fallback() -> bool {
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_me2dc_fast() -> bool {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn default_keepalive_interval() -> u64 {
|
pub(crate) fn default_keepalive_interval() -> u64 {
|
||||||
8
|
8
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -672,9 +672,11 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b
|
||||||
warned = true;
|
warned = true;
|
||||||
warn!("config reload: general.me_init_retry_attempts changed; restart required");
|
warn!("config reload: general.me_init_retry_attempts changed; restart required");
|
||||||
}
|
}
|
||||||
if old.general.me2dc_fallback != new.general.me2dc_fallback {
|
if old.general.me2dc_fallback != new.general.me2dc_fallback
|
||||||
|
|| old.general.me2dc_fast != new.general.me2dc_fast
|
||||||
|
{
|
||||||
warned = true;
|
warned = true;
|
||||||
warn!("config reload: general.me2dc_fallback changed; restart required");
|
warn!("config reload: general.me2dc_fallback/me2dc_fast changed; restart required");
|
||||||
}
|
}
|
||||||
if old.general.proxy_config_v4_cache_path != new.general.proxy_config_v4_cache_path
|
if old.general.proxy_config_v4_cache_path != new.general.proxy_config_v4_cache_path
|
||||||
|| old.general.proxy_config_v6_cache_path != new.general.proxy_config_v6_cache_path
|
|| old.general.proxy_config_v6_cache_path != new.general.proxy_config_v6_cache_path
|
||||||
|
|
|
||||||
|
|
@ -1217,6 +1217,7 @@ mod tests {
|
||||||
default_me_init_retry_attempts()
|
default_me_init_retry_attempts()
|
||||||
);
|
);
|
||||||
assert_eq!(cfg.general.me2dc_fallback, default_me2dc_fallback());
|
assert_eq!(cfg.general.me2dc_fallback, default_me2dc_fallback());
|
||||||
|
assert_eq!(cfg.general.me2dc_fast, default_me2dc_fast());
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
cfg.general.proxy_config_v4_cache_path,
|
cfg.general.proxy_config_v4_cache_path,
|
||||||
default_proxy_config_v4_cache_path()
|
default_proxy_config_v4_cache_path()
|
||||||
|
|
@ -1356,6 +1357,7 @@ mod tests {
|
||||||
default_me_init_retry_attempts()
|
default_me_init_retry_attempts()
|
||||||
);
|
);
|
||||||
assert_eq!(general.me2dc_fallback, default_me2dc_fallback());
|
assert_eq!(general.me2dc_fallback, default_me2dc_fallback());
|
||||||
|
assert_eq!(general.me2dc_fast, default_me2dc_fast());
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
general.proxy_config_v4_cache_path,
|
general.proxy_config_v4_cache_path,
|
||||||
default_proxy_config_v4_cache_path()
|
default_proxy_config_v4_cache_path()
|
||||||
|
|
|
||||||
|
|
@ -429,6 +429,11 @@ pub struct GeneralConfig {
|
||||||
#[serde(default = "default_me2dc_fallback")]
|
#[serde(default = "default_me2dc_fallback")]
|
||||||
pub me2dc_fallback: bool,
|
pub me2dc_fallback: bool,
|
||||||
|
|
||||||
|
/// Fast ME->Direct fallback mode for new sessions.
|
||||||
|
/// Active only when both `use_middle_proxy=true` and `me2dc_fallback=true`.
|
||||||
|
#[serde(default = "default_me2dc_fast")]
|
||||||
|
pub me2dc_fast: bool,
|
||||||
|
|
||||||
/// Enable ME keepalive padding frames.
|
/// Enable ME keepalive padding frames.
|
||||||
#[serde(default = "default_true")]
|
#[serde(default = "default_true")]
|
||||||
pub me_keepalive_enabled: bool,
|
pub me_keepalive_enabled: bool,
|
||||||
|
|
@ -939,6 +944,7 @@ impl Default for GeneralConfig {
|
||||||
middle_proxy_warm_standby: default_middle_proxy_warm_standby(),
|
middle_proxy_warm_standby: default_middle_proxy_warm_standby(),
|
||||||
me_init_retry_attempts: default_me_init_retry_attempts(),
|
me_init_retry_attempts: default_me_init_retry_attempts(),
|
||||||
me2dc_fallback: default_me2dc_fallback(),
|
me2dc_fallback: default_me2dc_fallback(),
|
||||||
|
me2dc_fast: default_me2dc_fast(),
|
||||||
me_keepalive_enabled: default_true(),
|
me_keepalive_enabled: default_true(),
|
||||||
me_keepalive_interval_secs: default_keepalive_interval(),
|
me_keepalive_interval_secs: default_keepalive_interval(),
|
||||||
me_keepalive_jitter_secs: default_keepalive_jitter(),
|
me_keepalive_jitter_secs: default_keepalive_jitter(),
|
||||||
|
|
|
||||||
|
|
@ -21,10 +21,24 @@ pub(crate) async fn configure_admission_gate(
|
||||||
if config.general.use_middle_proxy {
|
if config.general.use_middle_proxy {
|
||||||
if let Some(pool) = me_pool.as_ref() {
|
if let Some(pool) = me_pool.as_ref() {
|
||||||
let initial_ready = pool.admission_ready_conditional_cast().await;
|
let initial_ready = pool.admission_ready_conditional_cast().await;
|
||||||
admission_tx.send_replace(initial_ready);
|
let mut fallback_enabled = config.general.me2dc_fallback;
|
||||||
let _ = route_runtime.set_mode(RelayRouteMode::Middle);
|
let mut fast_fallback_enabled = fallback_enabled && config.general.me2dc_fast;
|
||||||
|
let (initial_gate_open, initial_route_mode, initial_fallback_reason) = if initial_ready {
|
||||||
|
(true, RelayRouteMode::Middle, None)
|
||||||
|
} else if fast_fallback_enabled {
|
||||||
|
(true, RelayRouteMode::Direct, Some("fast_not_ready_fallback"))
|
||||||
|
} else {
|
||||||
|
(false, RelayRouteMode::Middle, None)
|
||||||
|
};
|
||||||
|
admission_tx.send_replace(initial_gate_open);
|
||||||
|
let _ = route_runtime.set_mode(initial_route_mode);
|
||||||
if initial_ready {
|
if initial_ready {
|
||||||
info!("Conditional-admission gate: open / ME pool READY");
|
info!("Conditional-admission gate: open / ME pool READY");
|
||||||
|
} else if let Some(reason) = initial_fallback_reason {
|
||||||
|
warn!(
|
||||||
|
fallback_reason = reason,
|
||||||
|
"Conditional-admission gate opened in ME fast fallback mode"
|
||||||
|
);
|
||||||
} else {
|
} else {
|
||||||
warn!("Conditional-admission gate: closed / ME pool is NOT ready)");
|
warn!("Conditional-admission gate: closed / ME pool is NOT ready)");
|
||||||
}
|
}
|
||||||
|
|
@ -34,10 +48,9 @@ pub(crate) async fn configure_admission_gate(
|
||||||
let route_runtime_gate = route_runtime.clone();
|
let route_runtime_gate = route_runtime.clone();
|
||||||
let mut config_rx_gate = config_rx.clone();
|
let mut config_rx_gate = config_rx.clone();
|
||||||
let mut admission_poll_ms = config.general.me_admission_poll_ms.max(1);
|
let mut admission_poll_ms = config.general.me_admission_poll_ms.max(1);
|
||||||
let mut fallback_enabled = config.general.me2dc_fallback;
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let mut gate_open = initial_ready;
|
let mut gate_open = initial_gate_open;
|
||||||
let mut route_mode = RelayRouteMode::Middle;
|
let mut route_mode = initial_route_mode;
|
||||||
let mut ready_observed = initial_ready;
|
let mut ready_observed = initial_ready;
|
||||||
let mut not_ready_since = if initial_ready {
|
let mut not_ready_since = if initial_ready {
|
||||||
None
|
None
|
||||||
|
|
@ -53,16 +66,19 @@ pub(crate) async fn configure_admission_gate(
|
||||||
let cfg = config_rx_gate.borrow_and_update().clone();
|
let cfg = config_rx_gate.borrow_and_update().clone();
|
||||||
admission_poll_ms = cfg.general.me_admission_poll_ms.max(1);
|
admission_poll_ms = cfg.general.me_admission_poll_ms.max(1);
|
||||||
fallback_enabled = cfg.general.me2dc_fallback;
|
fallback_enabled = cfg.general.me2dc_fallback;
|
||||||
|
fast_fallback_enabled = cfg.general.me2dc_fallback && cfg.general.me2dc_fast;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
_ = tokio::time::sleep(Duration::from_millis(admission_poll_ms)) => {}
|
_ = tokio::time::sleep(Duration::from_millis(admission_poll_ms)) => {}
|
||||||
}
|
}
|
||||||
let ready = pool_for_gate.admission_ready_conditional_cast().await;
|
let ready = pool_for_gate.admission_ready_conditional_cast().await;
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
let (next_gate_open, next_route_mode, next_fallback_active) = if ready {
|
let (next_gate_open, next_route_mode, next_fallback_reason) = if ready {
|
||||||
ready_observed = true;
|
ready_observed = true;
|
||||||
not_ready_since = None;
|
not_ready_since = None;
|
||||||
(true, RelayRouteMode::Middle, false)
|
(true, RelayRouteMode::Middle, None)
|
||||||
|
} else if fast_fallback_enabled {
|
||||||
|
(true, RelayRouteMode::Direct, Some("fast_not_ready_fallback"))
|
||||||
} else {
|
} else {
|
||||||
let not_ready_started_at = *not_ready_since.get_or_insert(now);
|
let not_ready_started_at = *not_ready_since.get_or_insert(now);
|
||||||
let not_ready_for = now.saturating_duration_since(not_ready_started_at);
|
let not_ready_for = now.saturating_duration_since(not_ready_started_at);
|
||||||
|
|
@ -72,11 +88,12 @@ pub(crate) async fn configure_admission_gate(
|
||||||
STARTUP_FALLBACK_AFTER
|
STARTUP_FALLBACK_AFTER
|
||||||
};
|
};
|
||||||
if fallback_enabled && not_ready_for > fallback_after {
|
if fallback_enabled && not_ready_for > fallback_after {
|
||||||
(true, RelayRouteMode::Direct, true)
|
(true, RelayRouteMode::Direct, Some("strict_grace_fallback"))
|
||||||
} else {
|
} else {
|
||||||
(false, RelayRouteMode::Middle, false)
|
(false, RelayRouteMode::Middle, None)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
let next_fallback_active = next_fallback_reason.is_some();
|
||||||
|
|
||||||
if next_route_mode != route_mode {
|
if next_route_mode != route_mode {
|
||||||
route_mode = next_route_mode;
|
route_mode = next_route_mode;
|
||||||
|
|
@ -88,17 +105,28 @@ pub(crate) async fn configure_admission_gate(
|
||||||
"Middle-End routing restored for new sessions"
|
"Middle-End routing restored for new sessions"
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
let fallback_after = if ready_observed {
|
let fallback_reason = next_fallback_reason.unwrap_or("unknown");
|
||||||
RUNTIME_FALLBACK_AFTER
|
if fallback_reason == "strict_grace_fallback" {
|
||||||
|
let fallback_after = if ready_observed {
|
||||||
|
RUNTIME_FALLBACK_AFTER
|
||||||
|
} else {
|
||||||
|
STARTUP_FALLBACK_AFTER
|
||||||
|
};
|
||||||
|
warn!(
|
||||||
|
target_mode = route_mode.as_str(),
|
||||||
|
cutover_generation = snapshot.generation,
|
||||||
|
grace_secs = fallback_after.as_secs(),
|
||||||
|
fallback_reason,
|
||||||
|
"ME pool stayed not-ready beyond grace; routing new sessions via Direct-DC"
|
||||||
|
);
|
||||||
} else {
|
} else {
|
||||||
STARTUP_FALLBACK_AFTER
|
warn!(
|
||||||
};
|
target_mode = route_mode.as_str(),
|
||||||
warn!(
|
cutover_generation = snapshot.generation,
|
||||||
target_mode = route_mode.as_str(),
|
fallback_reason,
|
||||||
cutover_generation = snapshot.generation,
|
"ME pool not-ready; routing new sessions via Direct-DC (fast mode)"
|
||||||
grace_secs = fallback_after.as_secs(),
|
);
|
||||||
"ME pool stayed not-ready beyond grace; routing new sessions via Direct-DC"
|
}
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -108,7 +136,10 @@ pub(crate) async fn configure_admission_gate(
|
||||||
admission_tx_gate.send_replace(gate_open);
|
admission_tx_gate.send_replace(gate_open);
|
||||||
if gate_open {
|
if gate_open {
|
||||||
if next_fallback_active {
|
if next_fallback_active {
|
||||||
warn!("Conditional-admission gate opened in ME fallback mode");
|
warn!(
|
||||||
|
fallback_reason = next_fallback_reason.unwrap_or("unknown"),
|
||||||
|
"Conditional-admission gate opened in ME fallback mode"
|
||||||
|
);
|
||||||
} else {
|
} else {
|
||||||
info!("Conditional-admission gate opened / ME pool READY");
|
info!("Conditional-admission gate opened / ME pool READY");
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -45,10 +45,6 @@ const RELAY_IDLE_IO_POLL_MAX: Duration = Duration::from_secs(1);
|
||||||
const TINY_FRAME_DEBT_PER_TINY: u32 = 8;
|
const TINY_FRAME_DEBT_PER_TINY: u32 = 8;
|
||||||
const TINY_FRAME_DEBT_LIMIT: u32 = 512;
|
const TINY_FRAME_DEBT_LIMIT: u32 = 512;
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
const C2ME_SEND_TIMEOUT: Duration = Duration::from_millis(50);
|
|
||||||
#[cfg(not(test))]
|
|
||||||
const C2ME_SEND_TIMEOUT: Duration = Duration::from_secs(5);
|
|
||||||
#[cfg(test)]
|
|
||||||
const RELAY_TEST_STEP_TIMEOUT: Duration = Duration::from_secs(1);
|
const RELAY_TEST_STEP_TIMEOUT: Duration = Duration::from_secs(1);
|
||||||
const ME_D2C_FLUSH_BATCH_MAX_FRAMES_MIN: usize = 1;
|
const ME_D2C_FLUSH_BATCH_MAX_FRAMES_MIN: usize = 1;
|
||||||
const ME_D2C_FLUSH_BATCH_MAX_BYTES_MIN: usize = 4096;
|
const ME_D2C_FLUSH_BATCH_MAX_BYTES_MIN: usize = 4096;
|
||||||
|
|
@ -648,6 +644,7 @@ pub(crate) fn relay_idle_pressure_test_scope() -> std::sync::MutexGuard<'static,
|
||||||
async fn enqueue_c2me_command(
|
async fn enqueue_c2me_command(
|
||||||
tx: &mpsc::Sender<C2MeCommand>,
|
tx: &mpsc::Sender<C2MeCommand>,
|
||||||
cmd: C2MeCommand,
|
cmd: C2MeCommand,
|
||||||
|
send_timeout: Option<Duration>,
|
||||||
) -> std::result::Result<(), mpsc::error::SendError<C2MeCommand>> {
|
) -> std::result::Result<(), mpsc::error::SendError<C2MeCommand>> {
|
||||||
match tx.try_send(cmd) {
|
match tx.try_send(cmd) {
|
||||||
Ok(()) => Ok(()),
|
Ok(()) => Ok(()),
|
||||||
|
|
@ -658,12 +655,18 @@ async fn enqueue_c2me_command(
|
||||||
if tx.capacity() <= C2ME_SOFT_PRESSURE_MIN_FREE_SLOTS {
|
if tx.capacity() <= C2ME_SOFT_PRESSURE_MIN_FREE_SLOTS {
|
||||||
tokio::task::yield_now().await;
|
tokio::task::yield_now().await;
|
||||||
}
|
}
|
||||||
match timeout(C2ME_SEND_TIMEOUT, tx.reserve()).await {
|
let reserve_result = match send_timeout {
|
||||||
Ok(Ok(permit)) => {
|
Some(send_timeout) => match timeout(send_timeout, tx.reserve()).await {
|
||||||
|
Ok(result) => result,
|
||||||
|
Err(_) => return Err(mpsc::error::SendError(cmd)),
|
||||||
|
},
|
||||||
|
None => tx.reserve().await,
|
||||||
|
};
|
||||||
|
match reserve_result {
|
||||||
|
Ok(permit) => {
|
||||||
permit.send(cmd);
|
permit.send(cmd);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
Ok(Err(_)) => Err(mpsc::error::SendError(cmd)),
|
|
||||||
Err(_) => Err(mpsc::error::SendError(cmd)),
|
Err(_) => Err(mpsc::error::SendError(cmd)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -789,6 +792,10 @@ where
|
||||||
.general
|
.general
|
||||||
.me_c2me_channel_capacity
|
.me_c2me_channel_capacity
|
||||||
.max(C2ME_CHANNEL_CAPACITY_FALLBACK);
|
.max(C2ME_CHANNEL_CAPACITY_FALLBACK);
|
||||||
|
let c2me_send_timeout = match config.general.me_c2me_send_timeout_ms {
|
||||||
|
0 => None,
|
||||||
|
timeout_ms => Some(Duration::from_millis(timeout_ms)),
|
||||||
|
};
|
||||||
let (c2me_tx, mut c2me_rx) = mpsc::channel::<C2MeCommand>(c2me_channel_capacity);
|
let (c2me_tx, mut c2me_rx) = mpsc::channel::<C2MeCommand>(c2me_channel_capacity);
|
||||||
let me_pool_c2me = me_pool.clone();
|
let me_pool_c2me = me_pool.clone();
|
||||||
let c2me_sender = tokio::spawn(async move {
|
let c2me_sender = tokio::spawn(async move {
|
||||||
|
|
@ -1165,7 +1172,7 @@ where
|
||||||
user = %user,
|
user = %user,
|
||||||
"Middle-relay pressure eviction for idle-candidate session"
|
"Middle-relay pressure eviction for idle-candidate session"
|
||||||
);
|
);
|
||||||
let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close).await;
|
let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close, c2me_send_timeout).await;
|
||||||
main_result = Err(ProxyError::Proxy(
|
main_result = Err(ProxyError::Proxy(
|
||||||
"middle-relay session evicted under pressure (idle-candidate)".to_string(),
|
"middle-relay session evicted under pressure (idle-candidate)".to_string(),
|
||||||
));
|
));
|
||||||
|
|
@ -1184,7 +1191,7 @@ where
|
||||||
"Cutover affected middle session, closing client connection"
|
"Cutover affected middle session, closing client connection"
|
||||||
);
|
);
|
||||||
tokio::time::sleep(delay).await;
|
tokio::time::sleep(delay).await;
|
||||||
let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close).await;
|
let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close, c2me_send_timeout).await;
|
||||||
main_result = Err(ProxyError::Proxy(ROUTE_SWITCH_ERROR_MSG.to_string()));
|
main_result = Err(ProxyError::Proxy(ROUTE_SWITCH_ERROR_MSG.to_string()));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
@ -1242,8 +1249,12 @@ where
|
||||||
flags |= RPC_FLAG_NOT_ENCRYPTED;
|
flags |= RPC_FLAG_NOT_ENCRYPTED;
|
||||||
}
|
}
|
||||||
// Keep client read loop lightweight: route heavy ME send path via a dedicated task.
|
// Keep client read loop lightweight: route heavy ME send path via a dedicated task.
|
||||||
if enqueue_c2me_command(&c2me_tx, C2MeCommand::Data { payload, flags })
|
if enqueue_c2me_command(
|
||||||
.await
|
&c2me_tx,
|
||||||
|
C2MeCommand::Data { payload, flags },
|
||||||
|
c2me_send_timeout,
|
||||||
|
)
|
||||||
|
.await
|
||||||
.is_err()
|
.is_err()
|
||||||
{
|
{
|
||||||
main_result = Err(ProxyError::Proxy("ME sender channel closed".into()));
|
main_result = Err(ProxyError::Proxy("ME sender channel closed".into()));
|
||||||
|
|
@ -1253,7 +1264,9 @@ where
|
||||||
Ok(None) => {
|
Ok(None) => {
|
||||||
debug!(conn_id, "Client EOF");
|
debug!(conn_id, "Client EOF");
|
||||||
client_closed = true;
|
client_closed = true;
|
||||||
let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close).await;
|
let _ =
|
||||||
|
enqueue_c2me_command(&c2me_tx, C2MeCommand::Close, c2me_send_timeout)
|
||||||
|
.await;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
|
|
||||||
|
|
@ -126,6 +126,7 @@ async fn c2me_channel_full_path_yields_then_sends() {
|
||||||
payload: make_pooled_payload(&[0xBB, 0xCC]),
|
payload: make_pooled_payload(&[0xBB, 0xCC]),
|
||||||
flags: 2,
|
flags: 2,
|
||||||
},
|
},
|
||||||
|
None,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
});
|
});
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,7 @@ use super::pool::{MePool, RefillDcKey, RefillEndpointKey, WriterContour};
|
||||||
|
|
||||||
const ME_FLAP_UPTIME_THRESHOLD_SECS: u64 = 20;
|
const ME_FLAP_UPTIME_THRESHOLD_SECS: u64 = 20;
|
||||||
const ME_FLAP_QUARANTINE_SECS: u64 = 25;
|
const ME_FLAP_QUARANTINE_SECS: u64 = 25;
|
||||||
|
const ME_FLAP_MIN_UPTIME_MILLIS: u64 = 500;
|
||||||
const ME_REFILL_TOTAL_ATTEMPT_CAP: u32 = 20;
|
const ME_REFILL_TOTAL_ATTEMPT_CAP: u32 = 20;
|
||||||
|
|
||||||
impl MePool {
|
impl MePool {
|
||||||
|
|
@ -35,6 +36,17 @@ impl MePool {
|
||||||
uptime: Duration,
|
uptime: Duration,
|
||||||
reason: &'static str,
|
reason: &'static str,
|
||||||
) {
|
) {
|
||||||
|
if uptime < Duration::from_millis(ME_FLAP_MIN_UPTIME_MILLIS) {
|
||||||
|
debug!(
|
||||||
|
%addr,
|
||||||
|
reason,
|
||||||
|
uptime_ms = uptime.as_millis(),
|
||||||
|
min_uptime_ms = ME_FLAP_MIN_UPTIME_MILLIS,
|
||||||
|
"Skipping flap quarantine for ultra-short writer lifetime"
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if uptime > Duration::from_secs(ME_FLAP_UPTIME_THRESHOLD_SECS) {
|
if uptime > Duration::from_secs(ME_FLAP_UPTIME_THRESHOLD_SECS) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -309,6 +309,36 @@ async fn adversarial_blackhat_single_unexpected_remove_establishes_single_quaran
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn remove_ultra_short_uptime_writer_skips_flap_quarantine() {
|
||||||
|
let pool = make_pool().await;
|
||||||
|
let writer_id = 931;
|
||||||
|
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 12, 0, 131)), 443);
|
||||||
|
let before_total = pool.stats.get_me_endpoint_quarantine_total();
|
||||||
|
let before_unexpected = pool.stats.get_me_endpoint_quarantine_unexpected_total();
|
||||||
|
insert_writer(
|
||||||
|
&pool,
|
||||||
|
writer_id,
|
||||||
|
2,
|
||||||
|
addr,
|
||||||
|
false,
|
||||||
|
Instant::now() - Duration::from_millis(50),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
pool.remove_writer_and_close_clients(writer_id).await;
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
!pool.is_endpoint_quarantined(addr).await,
|
||||||
|
"ultra-short unexpected lifetime must not quarantine endpoint"
|
||||||
|
);
|
||||||
|
assert_eq!(pool.stats.get_me_endpoint_quarantine_total(), before_total);
|
||||||
|
assert_eq!(
|
||||||
|
pool.stats.get_me_endpoint_quarantine_unexpected_total(),
|
||||||
|
before_unexpected + 1
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn integration_old_uptime_writer_does_not_trigger_flap_quarantine() {
|
async fn integration_old_uptime_writer_does_not_trigger_flap_quarantine() {
|
||||||
let pool = make_pool().await;
|
let pool = make_pool().await;
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue