Compare commits

..

No commits in common. "cb5753f77c9d7cb12fc68dc069ca70dad111e596" and "7de822dd153aa5f197601e25e60c6ae562bea603" have entirely different histories.

10 changed files with 34 additions and 162 deletions

View File

@ -35,14 +35,11 @@ pub(super) struct RuntimeGatesData {
pub(super) conditional_cast_enabled: bool, pub(super) conditional_cast_enabled: bool,
pub(super) me_runtime_ready: bool, pub(super) me_runtime_ready: bool,
pub(super) me2dc_fallback_enabled: bool, pub(super) me2dc_fallback_enabled: bool,
pub(super) me2dc_fast_enabled: bool,
pub(super) use_middle_proxy: bool, pub(super) use_middle_proxy: bool,
pub(super) route_mode: &'static str, pub(super) route_mode: &'static str,
pub(super) reroute_active: bool, pub(super) reroute_active: bool,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub(super) reroute_to_direct_at_epoch_secs: Option<u64>, pub(super) reroute_to_direct_at_epoch_secs: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) reroute_reason: Option<&'static str>,
pub(super) startup_status: &'static str, pub(super) startup_status: &'static str,
pub(super) startup_stage: String, pub(super) startup_stage: String,
pub(super) startup_progress_pct: f64, pub(super) startup_progress_pct: f64,
@ -89,7 +86,6 @@ pub(super) struct EffectiveMiddleProxyLimits {
pub(super) writer_pick_mode: &'static str, pub(super) writer_pick_mode: &'static str,
pub(super) writer_pick_sample_size: u8, pub(super) writer_pick_sample_size: u8,
pub(super) me2dc_fallback: bool, pub(super) me2dc_fallback: bool,
pub(super) me2dc_fast: bool,
} }
#[derive(Serialize)] #[derive(Serialize)]
@ -173,8 +169,6 @@ pub(super) async fn build_runtime_gates_data(
let startup_summary = build_runtime_startup_summary(shared).await; let startup_summary = build_runtime_startup_summary(shared).await;
let route_state = shared.route_runtime.snapshot(); let route_state = shared.route_runtime.snapshot();
let route_mode = route_state.mode.as_str(); let route_mode = route_state.mode.as_str();
let fast_fallback_enabled =
cfg.general.use_middle_proxy && cfg.general.me2dc_fallback && cfg.general.me2dc_fast;
let reroute_active = cfg.general.use_middle_proxy let reroute_active = cfg.general.use_middle_proxy
&& cfg.general.me2dc_fallback && cfg.general.me2dc_fallback
&& matches!(route_state.mode, RelayRouteMode::Direct); && matches!(route_state.mode, RelayRouteMode::Direct);
@ -183,15 +177,6 @@ pub(super) async fn build_runtime_gates_data(
} else { } else {
None None
}; };
let reroute_reason = if reroute_active {
if fast_fallback_enabled {
Some("fast_not_ready_fallback")
} else {
Some("strict_grace_fallback")
}
} else {
None
};
let me_runtime_ready = if !cfg.general.use_middle_proxy { let me_runtime_ready = if !cfg.general.use_middle_proxy {
true true
} else { } else {
@ -209,12 +194,10 @@ pub(super) async fn build_runtime_gates_data(
conditional_cast_enabled: cfg.general.use_middle_proxy, conditional_cast_enabled: cfg.general.use_middle_proxy,
me_runtime_ready, me_runtime_ready,
me2dc_fallback_enabled: cfg.general.me2dc_fallback, me2dc_fallback_enabled: cfg.general.me2dc_fallback,
me2dc_fast_enabled: fast_fallback_enabled,
use_middle_proxy: cfg.general.use_middle_proxy, use_middle_proxy: cfg.general.use_middle_proxy,
route_mode, route_mode,
reroute_active, reroute_active,
reroute_to_direct_at_epoch_secs, reroute_to_direct_at_epoch_secs,
reroute_reason,
startup_status: startup_summary.status, startup_status: startup_summary.status,
startup_stage: startup_summary.stage, startup_stage: startup_summary.stage,
startup_progress_pct: startup_summary.progress_pct, startup_progress_pct: startup_summary.progress_pct,
@ -280,7 +263,6 @@ pub(super) fn build_limits_effective_data(cfg: &ProxyConfig) -> EffectiveLimitsD
writer_pick_mode: me_writer_pick_mode_label(cfg.general.me_writer_pick_mode), writer_pick_mode: me_writer_pick_mode_label(cfg.general.me_writer_pick_mode),
writer_pick_sample_size: cfg.general.me_writer_pick_sample_size, writer_pick_sample_size: cfg.general.me_writer_pick_sample_size,
me2dc_fallback: cfg.general.me2dc_fallback, me2dc_fallback: cfg.general.me2dc_fallback,
me2dc_fast: cfg.general.me2dc_fast,
}, },
user_ip_policy: EffectiveUserIpPolicyLimits { user_ip_policy: EffectiveUserIpPolicyLimits {
global_each: cfg.access.user_max_unique_ips_global_each, global_each: cfg.access.user_max_unique_ips_global_each,

View File

@ -273,10 +273,6 @@ pub(crate) fn default_me2dc_fallback() -> bool {
true true
} }
pub(crate) fn default_me2dc_fast() -> bool {
false
}
pub(crate) fn default_keepalive_interval() -> u64 { pub(crate) fn default_keepalive_interval() -> u64 {
8 8
} }

View File

@ -672,11 +672,9 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b
warned = true; warned = true;
warn!("config reload: general.me_init_retry_attempts changed; restart required"); warn!("config reload: general.me_init_retry_attempts changed; restart required");
} }
if old.general.me2dc_fallback != new.general.me2dc_fallback if old.general.me2dc_fallback != new.general.me2dc_fallback {
|| old.general.me2dc_fast != new.general.me2dc_fast
{
warned = true; warned = true;
warn!("config reload: general.me2dc_fallback/me2dc_fast changed; restart required"); warn!("config reload: general.me2dc_fallback changed; restart required");
} }
if old.general.proxy_config_v4_cache_path != new.general.proxy_config_v4_cache_path if old.general.proxy_config_v4_cache_path != new.general.proxy_config_v4_cache_path
|| old.general.proxy_config_v6_cache_path != new.general.proxy_config_v6_cache_path || old.general.proxy_config_v6_cache_path != new.general.proxy_config_v6_cache_path

View File

@ -1217,7 +1217,6 @@ mod tests {
default_me_init_retry_attempts() default_me_init_retry_attempts()
); );
assert_eq!(cfg.general.me2dc_fallback, default_me2dc_fallback()); assert_eq!(cfg.general.me2dc_fallback, default_me2dc_fallback());
assert_eq!(cfg.general.me2dc_fast, default_me2dc_fast());
assert_eq!( assert_eq!(
cfg.general.proxy_config_v4_cache_path, cfg.general.proxy_config_v4_cache_path,
default_proxy_config_v4_cache_path() default_proxy_config_v4_cache_path()
@ -1357,7 +1356,6 @@ mod tests {
default_me_init_retry_attempts() default_me_init_retry_attempts()
); );
assert_eq!(general.me2dc_fallback, default_me2dc_fallback()); assert_eq!(general.me2dc_fallback, default_me2dc_fallback());
assert_eq!(general.me2dc_fast, default_me2dc_fast());
assert_eq!( assert_eq!(
general.proxy_config_v4_cache_path, general.proxy_config_v4_cache_path,
default_proxy_config_v4_cache_path() default_proxy_config_v4_cache_path()

View File

@ -429,11 +429,6 @@ pub struct GeneralConfig {
#[serde(default = "default_me2dc_fallback")] #[serde(default = "default_me2dc_fallback")]
pub me2dc_fallback: bool, pub me2dc_fallback: bool,
/// Fast ME->Direct fallback mode for new sessions.
/// Active only when both `use_middle_proxy=true` and `me2dc_fallback=true`.
#[serde(default = "default_me2dc_fast")]
pub me2dc_fast: bool,
/// Enable ME keepalive padding frames. /// Enable ME keepalive padding frames.
#[serde(default = "default_true")] #[serde(default = "default_true")]
pub me_keepalive_enabled: bool, pub me_keepalive_enabled: bool,
@ -944,7 +939,6 @@ impl Default for GeneralConfig {
middle_proxy_warm_standby: default_middle_proxy_warm_standby(), middle_proxy_warm_standby: default_middle_proxy_warm_standby(),
me_init_retry_attempts: default_me_init_retry_attempts(), me_init_retry_attempts: default_me_init_retry_attempts(),
me2dc_fallback: default_me2dc_fallback(), me2dc_fallback: default_me2dc_fallback(),
me2dc_fast: default_me2dc_fast(),
me_keepalive_enabled: default_true(), me_keepalive_enabled: default_true(),
me_keepalive_interval_secs: default_keepalive_interval(), me_keepalive_interval_secs: default_keepalive_interval(),
me_keepalive_jitter_secs: default_keepalive_jitter(), me_keepalive_jitter_secs: default_keepalive_jitter(),

View File

@ -21,29 +21,10 @@ pub(crate) async fn configure_admission_gate(
if config.general.use_middle_proxy { if config.general.use_middle_proxy {
if let Some(pool) = me_pool.as_ref() { if let Some(pool) = me_pool.as_ref() {
let initial_ready = pool.admission_ready_conditional_cast().await; let initial_ready = pool.admission_ready_conditional_cast().await;
let mut fallback_enabled = config.general.me2dc_fallback; admission_tx.send_replace(initial_ready);
let mut fast_fallback_enabled = fallback_enabled && config.general.me2dc_fast; let _ = route_runtime.set_mode(RelayRouteMode::Middle);
let (initial_gate_open, initial_route_mode, initial_fallback_reason) = if initial_ready
{
(true, RelayRouteMode::Middle, None)
} else if fast_fallback_enabled {
(
true,
RelayRouteMode::Direct,
Some("fast_not_ready_fallback"),
)
} else {
(false, RelayRouteMode::Middle, None)
};
admission_tx.send_replace(initial_gate_open);
let _ = route_runtime.set_mode(initial_route_mode);
if initial_ready { if initial_ready {
info!("Conditional-admission gate: open / ME pool READY"); info!("Conditional-admission gate: open / ME pool READY");
} else if let Some(reason) = initial_fallback_reason {
warn!(
fallback_reason = reason,
"Conditional-admission gate opened in ME fast fallback mode"
);
} else { } else {
warn!("Conditional-admission gate: closed / ME pool is NOT ready)"); warn!("Conditional-admission gate: closed / ME pool is NOT ready)");
} }
@ -53,9 +34,10 @@ pub(crate) async fn configure_admission_gate(
let route_runtime_gate = route_runtime.clone(); let route_runtime_gate = route_runtime.clone();
let mut config_rx_gate = config_rx.clone(); let mut config_rx_gate = config_rx.clone();
let mut admission_poll_ms = config.general.me_admission_poll_ms.max(1); let mut admission_poll_ms = config.general.me_admission_poll_ms.max(1);
let mut fallback_enabled = config.general.me2dc_fallback;
tokio::spawn(async move { tokio::spawn(async move {
let mut gate_open = initial_gate_open; let mut gate_open = initial_ready;
let mut route_mode = initial_route_mode; let mut route_mode = RelayRouteMode::Middle;
let mut ready_observed = initial_ready; let mut ready_observed = initial_ready;
let mut not_ready_since = if initial_ready { let mut not_ready_since = if initial_ready {
None None
@ -71,23 +53,16 @@ pub(crate) async fn configure_admission_gate(
let cfg = config_rx_gate.borrow_and_update().clone(); let cfg = config_rx_gate.borrow_and_update().clone();
admission_poll_ms = cfg.general.me_admission_poll_ms.max(1); admission_poll_ms = cfg.general.me_admission_poll_ms.max(1);
fallback_enabled = cfg.general.me2dc_fallback; fallback_enabled = cfg.general.me2dc_fallback;
fast_fallback_enabled = cfg.general.me2dc_fallback && cfg.general.me2dc_fast;
continue; continue;
} }
_ = tokio::time::sleep(Duration::from_millis(admission_poll_ms)) => {} _ = tokio::time::sleep(Duration::from_millis(admission_poll_ms)) => {}
} }
let ready = pool_for_gate.admission_ready_conditional_cast().await; let ready = pool_for_gate.admission_ready_conditional_cast().await;
let now = Instant::now(); let now = Instant::now();
let (next_gate_open, next_route_mode, next_fallback_reason) = if ready { let (next_gate_open, next_route_mode, next_fallback_active) = if ready {
ready_observed = true; ready_observed = true;
not_ready_since = None; not_ready_since = None;
(true, RelayRouteMode::Middle, None) (true, RelayRouteMode::Middle, false)
} else if fast_fallback_enabled {
(
true,
RelayRouteMode::Direct,
Some("fast_not_ready_fallback"),
)
} else { } else {
let not_ready_started_at = *not_ready_since.get_or_insert(now); let not_ready_started_at = *not_ready_since.get_or_insert(now);
let not_ready_for = now.saturating_duration_since(not_ready_started_at); let not_ready_for = now.saturating_duration_since(not_ready_started_at);
@ -97,12 +72,11 @@ pub(crate) async fn configure_admission_gate(
STARTUP_FALLBACK_AFTER STARTUP_FALLBACK_AFTER
}; };
if fallback_enabled && not_ready_for > fallback_after { if fallback_enabled && not_ready_for > fallback_after {
(true, RelayRouteMode::Direct, Some("strict_grace_fallback")) (true, RelayRouteMode::Direct, true)
} else { } else {
(false, RelayRouteMode::Middle, None) (false, RelayRouteMode::Middle, false)
} }
}; };
let next_fallback_active = next_fallback_reason.is_some();
if next_route_mode != route_mode { if next_route_mode != route_mode {
route_mode = next_route_mode; route_mode = next_route_mode;
@ -114,28 +88,17 @@ pub(crate) async fn configure_admission_gate(
"Middle-End routing restored for new sessions" "Middle-End routing restored for new sessions"
); );
} else { } else {
let fallback_reason = next_fallback_reason.unwrap_or("unknown"); let fallback_after = if ready_observed {
if fallback_reason == "strict_grace_fallback" { RUNTIME_FALLBACK_AFTER
let fallback_after = if ready_observed {
RUNTIME_FALLBACK_AFTER
} else {
STARTUP_FALLBACK_AFTER
};
warn!(
target_mode = route_mode.as_str(),
cutover_generation = snapshot.generation,
grace_secs = fallback_after.as_secs(),
fallback_reason,
"ME pool stayed not-ready beyond grace; routing new sessions via Direct-DC"
);
} else { } else {
warn!( STARTUP_FALLBACK_AFTER
target_mode = route_mode.as_str(), };
cutover_generation = snapshot.generation, warn!(
fallback_reason, target_mode = route_mode.as_str(),
"ME pool not-ready; routing new sessions via Direct-DC (fast mode)" cutover_generation = snapshot.generation,
); grace_secs = fallback_after.as_secs(),
} "ME pool stayed not-ready beyond grace; routing new sessions via Direct-DC"
);
} }
} }
} }
@ -145,10 +108,7 @@ pub(crate) async fn configure_admission_gate(
admission_tx_gate.send_replace(gate_open); admission_tx_gate.send_replace(gate_open);
if gate_open { if gate_open {
if next_fallback_active { if next_fallback_active {
warn!( warn!("Conditional-admission gate opened in ME fallback mode");
fallback_reason = next_fallback_reason.unwrap_or("unknown"),
"Conditional-admission gate opened in ME fallback mode"
);
} else { } else {
info!("Conditional-admission gate opened / ME pool READY"); info!("Conditional-admission gate opened / ME pool READY");
} }

View File

@ -45,6 +45,10 @@ const RELAY_IDLE_IO_POLL_MAX: Duration = Duration::from_secs(1);
const TINY_FRAME_DEBT_PER_TINY: u32 = 8; const TINY_FRAME_DEBT_PER_TINY: u32 = 8;
const TINY_FRAME_DEBT_LIMIT: u32 = 512; const TINY_FRAME_DEBT_LIMIT: u32 = 512;
#[cfg(test)] #[cfg(test)]
const C2ME_SEND_TIMEOUT: Duration = Duration::from_millis(50);
#[cfg(not(test))]
const C2ME_SEND_TIMEOUT: Duration = Duration::from_secs(5);
#[cfg(test)]
const RELAY_TEST_STEP_TIMEOUT: Duration = Duration::from_secs(1); const RELAY_TEST_STEP_TIMEOUT: Duration = Duration::from_secs(1);
const ME_D2C_FLUSH_BATCH_MAX_FRAMES_MIN: usize = 1; const ME_D2C_FLUSH_BATCH_MAX_FRAMES_MIN: usize = 1;
const ME_D2C_FLUSH_BATCH_MAX_BYTES_MIN: usize = 4096; const ME_D2C_FLUSH_BATCH_MAX_BYTES_MIN: usize = 4096;
@ -644,7 +648,6 @@ pub(crate) fn relay_idle_pressure_test_scope() -> std::sync::MutexGuard<'static,
async fn enqueue_c2me_command( async fn enqueue_c2me_command(
tx: &mpsc::Sender<C2MeCommand>, tx: &mpsc::Sender<C2MeCommand>,
cmd: C2MeCommand, cmd: C2MeCommand,
send_timeout: Option<Duration>,
) -> std::result::Result<(), mpsc::error::SendError<C2MeCommand>> { ) -> std::result::Result<(), mpsc::error::SendError<C2MeCommand>> {
match tx.try_send(cmd) { match tx.try_send(cmd) {
Ok(()) => Ok(()), Ok(()) => Ok(()),
@ -655,18 +658,12 @@ async fn enqueue_c2me_command(
if tx.capacity() <= C2ME_SOFT_PRESSURE_MIN_FREE_SLOTS { if tx.capacity() <= C2ME_SOFT_PRESSURE_MIN_FREE_SLOTS {
tokio::task::yield_now().await; tokio::task::yield_now().await;
} }
let reserve_result = match send_timeout { match timeout(C2ME_SEND_TIMEOUT, tx.reserve()).await {
Some(send_timeout) => match timeout(send_timeout, tx.reserve()).await { Ok(Ok(permit)) => {
Ok(result) => result,
Err(_) => return Err(mpsc::error::SendError(cmd)),
},
None => tx.reserve().await,
};
match reserve_result {
Ok(permit) => {
permit.send(cmd); permit.send(cmd);
Ok(()) Ok(())
} }
Ok(Err(_)) => Err(mpsc::error::SendError(cmd)),
Err(_) => Err(mpsc::error::SendError(cmd)), Err(_) => Err(mpsc::error::SendError(cmd)),
} }
} }
@ -792,10 +789,6 @@ where
.general .general
.me_c2me_channel_capacity .me_c2me_channel_capacity
.max(C2ME_CHANNEL_CAPACITY_FALLBACK); .max(C2ME_CHANNEL_CAPACITY_FALLBACK);
let c2me_send_timeout = match config.general.me_c2me_send_timeout_ms {
0 => None,
timeout_ms => Some(Duration::from_millis(timeout_ms)),
};
let (c2me_tx, mut c2me_rx) = mpsc::channel::<C2MeCommand>(c2me_channel_capacity); let (c2me_tx, mut c2me_rx) = mpsc::channel::<C2MeCommand>(c2me_channel_capacity);
let me_pool_c2me = me_pool.clone(); let me_pool_c2me = me_pool.clone();
let c2me_sender = tokio::spawn(async move { let c2me_sender = tokio::spawn(async move {
@ -1172,7 +1165,7 @@ where
user = %user, user = %user,
"Middle-relay pressure eviction for idle-candidate session" "Middle-relay pressure eviction for idle-candidate session"
); );
let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close, c2me_send_timeout).await; let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close).await;
main_result = Err(ProxyError::Proxy( main_result = Err(ProxyError::Proxy(
"middle-relay session evicted under pressure (idle-candidate)".to_string(), "middle-relay session evicted under pressure (idle-candidate)".to_string(),
)); ));
@ -1191,7 +1184,7 @@ where
"Cutover affected middle session, closing client connection" "Cutover affected middle session, closing client connection"
); );
tokio::time::sleep(delay).await; tokio::time::sleep(delay).await;
let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close, c2me_send_timeout).await; let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close).await;
main_result = Err(ProxyError::Proxy(ROUTE_SWITCH_ERROR_MSG.to_string())); main_result = Err(ProxyError::Proxy(ROUTE_SWITCH_ERROR_MSG.to_string()));
break; break;
} }
@ -1249,12 +1242,8 @@ where
flags |= RPC_FLAG_NOT_ENCRYPTED; flags |= RPC_FLAG_NOT_ENCRYPTED;
} }
// Keep client read loop lightweight: route heavy ME send path via a dedicated task. // Keep client read loop lightweight: route heavy ME send path via a dedicated task.
if enqueue_c2me_command( if enqueue_c2me_command(&c2me_tx, C2MeCommand::Data { payload, flags })
&c2me_tx, .await
C2MeCommand::Data { payload, flags },
c2me_send_timeout,
)
.await
.is_err() .is_err()
{ {
main_result = Err(ProxyError::Proxy("ME sender channel closed".into())); main_result = Err(ProxyError::Proxy("ME sender channel closed".into()));
@ -1264,9 +1253,7 @@ where
Ok(None) => { Ok(None) => {
debug!(conn_id, "Client EOF"); debug!(conn_id, "Client EOF");
client_closed = true; client_closed = true;
let _ = let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close).await;
enqueue_c2me_command(&c2me_tx, C2MeCommand::Close, c2me_send_timeout)
.await;
break; break;
} }
Err(e) => { Err(e) => {

View File

@ -126,7 +126,6 @@ async fn c2me_channel_full_path_yields_then_sends() {
payload: make_pooled_payload(&[0xBB, 0xCC]), payload: make_pooled_payload(&[0xBB, 0xCC]),
flags: 2, flags: 2,
}, },
None,
) )
.await .await
}); });

View File

@ -13,7 +13,6 @@ use super::pool::{MePool, RefillDcKey, RefillEndpointKey, WriterContour};
const ME_FLAP_UPTIME_THRESHOLD_SECS: u64 = 20; const ME_FLAP_UPTIME_THRESHOLD_SECS: u64 = 20;
const ME_FLAP_QUARANTINE_SECS: u64 = 25; const ME_FLAP_QUARANTINE_SECS: u64 = 25;
const ME_FLAP_MIN_UPTIME_MILLIS: u64 = 500;
const ME_REFILL_TOTAL_ATTEMPT_CAP: u32 = 20; const ME_REFILL_TOTAL_ATTEMPT_CAP: u32 = 20;
impl MePool { impl MePool {
@ -36,17 +35,6 @@ impl MePool {
uptime: Duration, uptime: Duration,
reason: &'static str, reason: &'static str,
) { ) {
if uptime < Duration::from_millis(ME_FLAP_MIN_UPTIME_MILLIS) {
debug!(
%addr,
reason,
uptime_ms = uptime.as_millis(),
min_uptime_ms = ME_FLAP_MIN_UPTIME_MILLIS,
"Skipping flap quarantine for ultra-short writer lifetime"
);
return;
}
if uptime > Duration::from_secs(ME_FLAP_UPTIME_THRESHOLD_SECS) { if uptime > Duration::from_secs(ME_FLAP_UPTIME_THRESHOLD_SECS) {
return; return;
} }

View File

@ -309,36 +309,6 @@ async fn adversarial_blackhat_single_unexpected_remove_establishes_single_quaran
); );
} }
#[tokio::test]
async fn remove_ultra_short_uptime_writer_skips_flap_quarantine() {
let pool = make_pool().await;
let writer_id = 931;
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 12, 0, 131)), 443);
let before_total = pool.stats.get_me_endpoint_quarantine_total();
let before_unexpected = pool.stats.get_me_endpoint_quarantine_unexpected_total();
insert_writer(
&pool,
writer_id,
2,
addr,
false,
Instant::now() - Duration::from_millis(50),
)
.await;
pool.remove_writer_and_close_clients(writer_id).await;
assert!(
!pool.is_endpoint_quarantined(addr).await,
"ultra-short unexpected lifetime must not quarantine endpoint"
);
assert_eq!(pool.stats.get_me_endpoint_quarantine_total(), before_total);
assert_eq!(
pool.stats.get_me_endpoint_quarantine_unexpected_total(),
before_unexpected + 1
);
}
#[tokio::test] #[tokio::test]
async fn integration_old_uptime_writer_does_not_trigger_flap_quarantine() { async fn integration_old_uptime_writer_does_not_trigger_flap_quarantine() {
let pool = make_pool().await; let pool = make_pool().await;