diff --git a/src/maestro/admission.rs b/src/maestro/admission.rs index 82484ad..e9b1cbc 100644 --- a/src/maestro/admission.rs +++ b/src/maestro/admission.rs @@ -17,6 +17,7 @@ pub(crate) async fn configure_admission_gate( route_runtime: Arc, admission_tx: &watch::Sender, config_rx: watch::Receiver>, + me_ready_rx: watch::Receiver, ) { if config.general.use_middle_proxy { if let Some(pool) = me_pool.as_ref() { @@ -52,6 +53,7 @@ pub(crate) async fn configure_admission_gate( let admission_tx_gate = admission_tx.clone(); let route_runtime_gate = route_runtime.clone(); let mut config_rx_gate = config_rx.clone(); + let mut me_ready_rx_gate = me_ready_rx; let mut admission_poll_ms = config.general.me_admission_poll_ms.max(1); tokio::spawn(async move { let mut gate_open = initial_gate_open; @@ -74,6 +76,11 @@ pub(crate) async fn configure_admission_gate( fast_fallback_enabled = cfg.general.me2dc_fallback && cfg.general.me2dc_fast; continue; } + changed = me_ready_rx_gate.changed() => { + if changed.is_err() { + break; + } + } _ = tokio::time::sleep(Duration::from_millis(admission_poll_ms)) => {} } let ready = pool_for_gate.admission_ready_conditional_cast().await; diff --git a/src/maestro/me_startup.rs b/src/maestro/me_startup.rs index 7002cec..9dde7fa 100644 --- a/src/maestro/me_startup.rs +++ b/src/maestro/me_startup.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use std::time::Duration; -use tokio::sync::RwLock; +use tokio::sync::{RwLock, watch}; use tracing::{error, info, warn}; use crate::config::ProxyConfig; @@ -29,6 +29,7 @@ pub(crate) async fn initialize_me_pool( rng: Arc, stats: Arc, api_me_pool: Arc>>>, + me_ready_tx: watch::Sender, ) -> Option> { if !use_middle_proxy { return None; @@ -314,6 +315,7 @@ pub(crate) async fn initialize_me_pool( let pool_bg = pool.clone(); let rng_bg = rng.clone(); let startup_tracker_bg = startup_tracker.clone(); + let me_ready_tx_bg = me_ready_tx.clone(); let retry_limit = if me_init_retry_attempts == 0 { String::from("unlimited") } else { @@ -347,6 +349,9 @@ pub(crate) async fn initialize_me_pool( startup_tracker_bg .set_me_status(StartupMeStatus::Ready, "ready") .await; + me_ready_tx_bg.send_modify(|version| { + *version = version.saturating_add(1); + }); info!( attempt = init_attempt, "Middle-End pool initialized successfully" @@ -474,6 +479,9 @@ pub(crate) async fn initialize_me_pool( startup_tracker .set_me_status(StartupMeStatus::Ready, "ready") .await; + me_ready_tx.send_modify(|version| { + *version = version.saturating_add(1); + }); info!( attempt = init_attempt, "Middle-End pool initialized successfully" diff --git a/src/maestro/mod.rs b/src/maestro/mod.rs index e159e25..1792f21 100644 --- a/src/maestro/mod.rs +++ b/src/maestro/mod.rs @@ -660,6 +660,8 @@ async fn run_telemt_core( .await; } + let (me_ready_tx, me_ready_rx) = watch::channel(0_u64); + let me_pool: Option> = me_startup::initialize_me_pool( use_middle_proxy, &config, @@ -670,6 +672,7 @@ async fn run_telemt_core( rng.clone(), stats.clone(), api_me_pool.clone(), + me_ready_tx.clone(), ) .await; @@ -743,6 +746,7 @@ async fn run_telemt_core( api_config_tx.clone(), me_pool.clone(), shared_state.clone(), + me_ready_tx.clone(), ) .await; let config_rx = runtime_watches.config_rx; @@ -756,6 +760,7 @@ async fn run_telemt_core( route_runtime.clone(), &admission_tx, config_rx.clone(), + me_ready_rx, ) .await; let _admission_tx_hold = admission_tx; diff --git a/src/maestro/runtime_tasks.rs b/src/maestro/runtime_tasks.rs index b54e9f9..3637694 100644 --- a/src/maestro/runtime_tasks.rs +++ b/src/maestro/runtime_tasks.rs @@ -53,6 +53,7 @@ pub(crate) async fn spawn_runtime_tasks( api_config_tx: watch::Sender>, me_pool_for_policy: Option>, shared_state: Arc, + me_ready_tx: watch::Sender, ) -> RuntimeWatches { let um_clone = upstream_manager.clone(); let dc_overrides_for_health = config.dc_overrides.clone(); @@ -250,12 +251,14 @@ pub(crate) async fn spawn_runtime_tasks( let pool_clone_sched = pool.clone(); let rng_clone_sched = rng.clone(); let config_rx_clone_sched = config_rx.clone(); + let me_ready_tx_sched = me_ready_tx.clone(); tokio::spawn(async move { crate::transport::middle_proxy::me_reinit_scheduler( pool_clone_sched, rng_clone_sched, config_rx_clone_sched, reinit_rx, + me_ready_tx_sched, ) .await; }); diff --git a/src/transport/middle_proxy/pool_reinit.rs b/src/transport/middle_proxy/pool_reinit.rs index db6411c..5b0d7d9 100644 --- a/src/transport/middle_proxy/pool_reinit.rs +++ b/src/transport/middle_proxy/pool_reinit.rs @@ -365,7 +365,10 @@ impl MePool { } } - pub async fn zero_downtime_reinit_after_map_change(self: &Arc, rng: &SecureRandom) { + pub async fn zero_downtime_reinit_after_map_change( + self: &Arc, + rng: &SecureRandom, + ) -> bool { let desired_by_dc = self.desired_dc_endpoints().await; let now_epoch_secs = Self::now_epoch_secs(); let v4_suppressed = self.is_family_temporarily_suppressed(IpFamily::V4, now_epoch_secs); @@ -380,7 +383,7 @@ impl MePool { MeDrainGateReason::CoverageQuorum }; self.set_last_drain_gate(false, false, reason, now_epoch_secs); - return; + return false; } let desired_map_hash = Self::desired_map_hash(&desired_by_dc); @@ -490,7 +493,7 @@ impl MePool { missing_dc = ?missing_dc, "ME reinit coverage below threshold; keeping stale writers" ); - return; + return false; } if hardswap { @@ -520,7 +523,7 @@ impl MePool { missing_dc = ?fresh_missing_dc, "ME hardswap pending: fresh generation DC coverage incomplete" ); - return; + return false; } } @@ -567,7 +570,7 @@ impl MePool { self.clear_pending_hardswap_state(); } debug!("ME reinit cycle completed with no stale writers"); - return; + return true; } let drain_timeout = self.force_close_timeout(); @@ -606,10 +609,11 @@ impl MePool { if hardswap { self.clear_pending_hardswap_state(); } + true } - pub async fn zero_downtime_reinit_periodic(self: &Arc, rng: &SecureRandom) { - self.zero_downtime_reinit_after_map_change(rng).await; + pub async fn zero_downtime_reinit_periodic(self: &Arc, rng: &SecureRandom) -> bool { + self.zero_downtime_reinit_after_map_change(rng).await } } diff --git a/src/transport/middle_proxy/rotation.rs b/src/transport/middle_proxy/rotation.rs index ffc7a9d..9308226 100644 --- a/src/transport/middle_proxy/rotation.rs +++ b/src/transport/middle_proxy/rotation.rs @@ -47,6 +47,7 @@ pub async fn me_reinit_scheduler( rng: Arc, config_rx: watch::Receiver>, mut trigger_rx: mpsc::Receiver, + me_ready_tx: watch::Sender, ) { info!("ME reinit scheduler started"); loop { @@ -90,15 +91,25 @@ pub async fn me_reinit_scheduler( if cfg.general.me_reinit_singleflight { debug!(reason, "ME reinit scheduled (single-flight)"); - pool.zero_downtime_reinit_periodic(rng.as_ref()).await; + if pool.zero_downtime_reinit_periodic(rng.as_ref()).await { + me_ready_tx.send_modify(|version| { + *version = version.saturating_add(1); + }); + } } else { debug!(reason, "ME reinit scheduled (concurrent mode)"); let pool_clone = pool.clone(); let rng_clone = rng.clone(); + let me_ready_tx_clone = me_ready_tx.clone(); tokio::spawn(async move { - pool_clone + if pool_clone .zero_downtime_reinit_periodic(rng_clone.as_ref()) - .await; + .await + { + me_ready_tx_clone.send_modify(|version| { + *version = version.saturating_add(1); + }); + } }); } }