Event-driven Wakeup for ME Admission-gate

This commit is contained in:
Alexey
2026-05-08 13:34:41 +03:00
parent 658a565cb3
commit 86573be493
6 changed files with 49 additions and 11 deletions

View File

@@ -17,6 +17,7 @@ pub(crate) async fn configure_admission_gate(
route_runtime: Arc<RouteRuntimeController>, route_runtime: Arc<RouteRuntimeController>,
admission_tx: &watch::Sender<bool>, admission_tx: &watch::Sender<bool>,
config_rx: watch::Receiver<Arc<ProxyConfig>>, config_rx: watch::Receiver<Arc<ProxyConfig>>,
me_ready_rx: watch::Receiver<u64>,
) { ) {
if config.general.use_middle_proxy { if config.general.use_middle_proxy {
if let Some(pool) = me_pool.as_ref() { if let Some(pool) = me_pool.as_ref() {
@@ -52,6 +53,7 @@ pub(crate) async fn configure_admission_gate(
let admission_tx_gate = admission_tx.clone(); let admission_tx_gate = admission_tx.clone();
let route_runtime_gate = route_runtime.clone(); let route_runtime_gate = route_runtime.clone();
let mut config_rx_gate = config_rx.clone(); let mut config_rx_gate = config_rx.clone();
let mut me_ready_rx_gate = me_ready_rx;
let mut admission_poll_ms = config.general.me_admission_poll_ms.max(1); let mut admission_poll_ms = config.general.me_admission_poll_ms.max(1);
tokio::spawn(async move { tokio::spawn(async move {
let mut gate_open = initial_gate_open; let mut gate_open = initial_gate_open;
@@ -74,6 +76,11 @@ pub(crate) async fn configure_admission_gate(
fast_fallback_enabled = cfg.general.me2dc_fallback && cfg.general.me2dc_fast; fast_fallback_enabled = cfg.general.me2dc_fallback && cfg.general.me2dc_fast;
continue; continue;
} }
changed = me_ready_rx_gate.changed() => {
if changed.is_err() {
break;
}
}
_ = tokio::time::sleep(Duration::from_millis(admission_poll_ms)) => {} _ = tokio::time::sleep(Duration::from_millis(admission_poll_ms)) => {}
} }
let ready = pool_for_gate.admission_ready_conditional_cast().await; let ready = pool_for_gate.admission_ready_conditional_cast().await;

View File

@@ -3,7 +3,7 @@
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::sync::RwLock; use tokio::sync::{RwLock, watch};
use tracing::{error, info, warn}; use tracing::{error, info, warn};
use crate::config::ProxyConfig; use crate::config::ProxyConfig;
@@ -29,6 +29,7 @@ pub(crate) async fn initialize_me_pool(
rng: Arc<SecureRandom>, rng: Arc<SecureRandom>,
stats: Arc<Stats>, stats: Arc<Stats>,
api_me_pool: Arc<RwLock<Option<Arc<MePool>>>>, api_me_pool: Arc<RwLock<Option<Arc<MePool>>>>,
me_ready_tx: watch::Sender<u64>,
) -> Option<Arc<MePool>> { ) -> Option<Arc<MePool>> {
if !use_middle_proxy { if !use_middle_proxy {
return None; return None;
@@ -314,6 +315,7 @@ pub(crate) async fn initialize_me_pool(
let pool_bg = pool.clone(); let pool_bg = pool.clone();
let rng_bg = rng.clone(); let rng_bg = rng.clone();
let startup_tracker_bg = startup_tracker.clone(); let startup_tracker_bg = startup_tracker.clone();
let me_ready_tx_bg = me_ready_tx.clone();
let retry_limit = if me_init_retry_attempts == 0 { let retry_limit = if me_init_retry_attempts == 0 {
String::from("unlimited") String::from("unlimited")
} else { } else {
@@ -347,6 +349,9 @@ pub(crate) async fn initialize_me_pool(
startup_tracker_bg startup_tracker_bg
.set_me_status(StartupMeStatus::Ready, "ready") .set_me_status(StartupMeStatus::Ready, "ready")
.await; .await;
me_ready_tx_bg.send_modify(|version| {
*version = version.saturating_add(1);
});
info!( info!(
attempt = init_attempt, attempt = init_attempt,
"Middle-End pool initialized successfully" "Middle-End pool initialized successfully"
@@ -474,6 +479,9 @@ pub(crate) async fn initialize_me_pool(
startup_tracker startup_tracker
.set_me_status(StartupMeStatus::Ready, "ready") .set_me_status(StartupMeStatus::Ready, "ready")
.await; .await;
me_ready_tx.send_modify(|version| {
*version = version.saturating_add(1);
});
info!( info!(
attempt = init_attempt, attempt = init_attempt,
"Middle-End pool initialized successfully" "Middle-End pool initialized successfully"

View File

@@ -660,6 +660,8 @@ async fn run_telemt_core(
.await; .await;
} }
let (me_ready_tx, me_ready_rx) = watch::channel(0_u64);
let me_pool: Option<Arc<MePool>> = me_startup::initialize_me_pool( let me_pool: Option<Arc<MePool>> = me_startup::initialize_me_pool(
use_middle_proxy, use_middle_proxy,
&config, &config,
@@ -670,6 +672,7 @@ async fn run_telemt_core(
rng.clone(), rng.clone(),
stats.clone(), stats.clone(),
api_me_pool.clone(), api_me_pool.clone(),
me_ready_tx.clone(),
) )
.await; .await;
@@ -743,6 +746,7 @@ async fn run_telemt_core(
api_config_tx.clone(), api_config_tx.clone(),
me_pool.clone(), me_pool.clone(),
shared_state.clone(), shared_state.clone(),
me_ready_tx.clone(),
) )
.await; .await;
let config_rx = runtime_watches.config_rx; let config_rx = runtime_watches.config_rx;
@@ -756,6 +760,7 @@ async fn run_telemt_core(
route_runtime.clone(), route_runtime.clone(),
&admission_tx, &admission_tx,
config_rx.clone(), config_rx.clone(),
me_ready_rx,
) )
.await; .await;
let _admission_tx_hold = admission_tx; let _admission_tx_hold = admission_tx;

View File

@@ -53,6 +53,7 @@ pub(crate) async fn spawn_runtime_tasks(
api_config_tx: watch::Sender<Arc<ProxyConfig>>, api_config_tx: watch::Sender<Arc<ProxyConfig>>,
me_pool_for_policy: Option<Arc<MePool>>, me_pool_for_policy: Option<Arc<MePool>>,
shared_state: Arc<ProxySharedState>, shared_state: Arc<ProxySharedState>,
me_ready_tx: watch::Sender<u64>,
) -> RuntimeWatches { ) -> RuntimeWatches {
let um_clone = upstream_manager.clone(); let um_clone = upstream_manager.clone();
let dc_overrides_for_health = config.dc_overrides.clone(); let dc_overrides_for_health = config.dc_overrides.clone();
@@ -250,12 +251,14 @@ pub(crate) async fn spawn_runtime_tasks(
let pool_clone_sched = pool.clone(); let pool_clone_sched = pool.clone();
let rng_clone_sched = rng.clone(); let rng_clone_sched = rng.clone();
let config_rx_clone_sched = config_rx.clone(); let config_rx_clone_sched = config_rx.clone();
let me_ready_tx_sched = me_ready_tx.clone();
tokio::spawn(async move { tokio::spawn(async move {
crate::transport::middle_proxy::me_reinit_scheduler( crate::transport::middle_proxy::me_reinit_scheduler(
pool_clone_sched, pool_clone_sched,
rng_clone_sched, rng_clone_sched,
config_rx_clone_sched, config_rx_clone_sched,
reinit_rx, reinit_rx,
me_ready_tx_sched,
) )
.await; .await;
}); });

View File

@@ -365,7 +365,10 @@ impl MePool {
} }
} }
pub async fn zero_downtime_reinit_after_map_change(self: &Arc<Self>, rng: &SecureRandom) { pub async fn zero_downtime_reinit_after_map_change(
self: &Arc<Self>,
rng: &SecureRandom,
) -> bool {
let desired_by_dc = self.desired_dc_endpoints().await; let desired_by_dc = self.desired_dc_endpoints().await;
let now_epoch_secs = Self::now_epoch_secs(); let now_epoch_secs = Self::now_epoch_secs();
let v4_suppressed = self.is_family_temporarily_suppressed(IpFamily::V4, now_epoch_secs); let v4_suppressed = self.is_family_temporarily_suppressed(IpFamily::V4, now_epoch_secs);
@@ -380,7 +383,7 @@ impl MePool {
MeDrainGateReason::CoverageQuorum MeDrainGateReason::CoverageQuorum
}; };
self.set_last_drain_gate(false, false, reason, now_epoch_secs); self.set_last_drain_gate(false, false, reason, now_epoch_secs);
return; return false;
} }
let desired_map_hash = Self::desired_map_hash(&desired_by_dc); let desired_map_hash = Self::desired_map_hash(&desired_by_dc);
@@ -490,7 +493,7 @@ impl MePool {
missing_dc = ?missing_dc, missing_dc = ?missing_dc,
"ME reinit coverage below threshold; keeping stale writers" "ME reinit coverage below threshold; keeping stale writers"
); );
return; return false;
} }
if hardswap { if hardswap {
@@ -520,7 +523,7 @@ impl MePool {
missing_dc = ?fresh_missing_dc, missing_dc = ?fresh_missing_dc,
"ME hardswap pending: fresh generation DC coverage incomplete" "ME hardswap pending: fresh generation DC coverage incomplete"
); );
return; return false;
} }
} }
@@ -567,7 +570,7 @@ impl MePool {
self.clear_pending_hardswap_state(); self.clear_pending_hardswap_state();
} }
debug!("ME reinit cycle completed with no stale writers"); debug!("ME reinit cycle completed with no stale writers");
return; return true;
} }
let drain_timeout = self.force_close_timeout(); let drain_timeout = self.force_close_timeout();
@@ -606,10 +609,11 @@ impl MePool {
if hardswap { if hardswap {
self.clear_pending_hardswap_state(); self.clear_pending_hardswap_state();
} }
true
} }
pub async fn zero_downtime_reinit_periodic(self: &Arc<Self>, rng: &SecureRandom) { pub async fn zero_downtime_reinit_periodic(self: &Arc<Self>, rng: &SecureRandom) -> bool {
self.zero_downtime_reinit_after_map_change(rng).await; self.zero_downtime_reinit_after_map_change(rng).await
} }
} }

View File

@@ -47,6 +47,7 @@ pub async fn me_reinit_scheduler(
rng: Arc<SecureRandom>, rng: Arc<SecureRandom>,
config_rx: watch::Receiver<Arc<ProxyConfig>>, config_rx: watch::Receiver<Arc<ProxyConfig>>,
mut trigger_rx: mpsc::Receiver<MeReinitTrigger>, mut trigger_rx: mpsc::Receiver<MeReinitTrigger>,
me_ready_tx: watch::Sender<u64>,
) { ) {
info!("ME reinit scheduler started"); info!("ME reinit scheduler started");
loop { loop {
@@ -90,15 +91,25 @@ pub async fn me_reinit_scheduler(
if cfg.general.me_reinit_singleflight { if cfg.general.me_reinit_singleflight {
debug!(reason, "ME reinit scheduled (single-flight)"); debug!(reason, "ME reinit scheduled (single-flight)");
pool.zero_downtime_reinit_periodic(rng.as_ref()).await; if pool.zero_downtime_reinit_periodic(rng.as_ref()).await {
me_ready_tx.send_modify(|version| {
*version = version.saturating_add(1);
});
}
} else { } else {
debug!(reason, "ME reinit scheduled (concurrent mode)"); debug!(reason, "ME reinit scheduled (concurrent mode)");
let pool_clone = pool.clone(); let pool_clone = pool.clone();
let rng_clone = rng.clone(); let rng_clone = rng.clone();
let me_ready_tx_clone = me_ready_tx.clone();
tokio::spawn(async move { tokio::spawn(async move {
pool_clone if pool_clone
.zero_downtime_reinit_periodic(rng_clone.as_ref()) .zero_downtime_reinit_periodic(rng_clone.as_ref())
.await; .await
{
me_ready_tx_clone.send_modify(|version| {
*version = version.saturating_add(1);
});
}
}); });
} }
} }