Exclusive Mask + Startup Speed-up

Signed-off-by: Alexey <247128645+axkurcom@users.noreply.github.com>
This commit is contained in:
Alexey
2026-05-19 21:56:26 +03:00
parent 9e877e45c9
commit 914f141715
14 changed files with 529 additions and 109 deletions

View File

@@ -1,7 +1,7 @@
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::watch;
use tokio::sync::{RwLock, watch};
use tracing::{info, warn};
use crate::config::ProxyConfig;
@@ -14,24 +14,32 @@ const RUNTIME_FALLBACK_AFTER: Duration = Duration::from_secs(6);
pub(crate) async fn configure_admission_gate(
config: &Arc<ProxyConfig>,
me_pool: Option<Arc<MePool>>,
me_pool_runtime: Arc<RwLock<Option<Arc<MePool>>>>,
route_runtime: Arc<RouteRuntimeController>,
admission_tx: &watch::Sender<bool>,
config_rx: watch::Receiver<Arc<ProxyConfig>>,
me_ready_rx: watch::Receiver<u64>,
) {
if config.general.use_middle_proxy {
if let Some(pool) = me_pool.as_ref() {
let initial_ready = pool.admission_ready_conditional_cast().await;
if me_pool.is_some() || config.general.me2dc_fallback {
let initial_pool = match me_pool.as_ref() {
Some(pool) => Some(pool.clone()),
None => me_pool_runtime.read().await.clone(),
};
let initial_ready = match initial_pool.as_ref() {
Some(pool) => pool.admission_ready_conditional_cast().await,
None => false,
};
let mut fallback_enabled = config.general.me2dc_fallback;
let mut fast_fallback_enabled = fallback_enabled && config.general.me2dc_fast;
let (initial_gate_open, initial_route_mode, initial_fallback_reason) = if initial_ready
{
(true, RelayRouteMode::Middle, None)
} else if fast_fallback_enabled {
} else if fallback_enabled {
(
true,
RelayRouteMode::Direct,
Some("fast_not_ready_fallback"),
Some("startup_direct_fallback"),
)
} else {
(false, RelayRouteMode::Middle, None)
@@ -49,7 +57,8 @@ pub(crate) async fn configure_admission_gate(
warn!("Conditional-admission gate: closed / ME pool is NOT ready)");
}
let pool_for_gate = pool.clone();
let mut pool_for_gate = initial_pool;
let pool_runtime_for_gate = me_pool_runtime.clone();
let admission_tx_gate = admission_tx.clone();
let route_runtime_gate = route_runtime.clone();
let mut config_rx_gate = config_rx.clone();
@@ -83,12 +92,27 @@ pub(crate) async fn configure_admission_gate(
}
_ = tokio::time::sleep(Duration::from_millis(admission_poll_ms)) => {}
}
let ready = pool_for_gate.admission_ready_conditional_cast().await;
if pool_for_gate.is_none() {
pool_for_gate = pool_runtime_for_gate.read().await.clone();
}
let ready = match pool_for_gate.as_ref() {
Some(pool) => pool.admission_ready_conditional_cast().await,
None => false,
};
let now = Instant::now();
let (next_gate_open, next_route_mode, next_fallback_reason) = if ready {
ready_observed = true;
not_ready_since = None;
if let Some(pool) = pool_for_gate.as_ref() {
pool.set_runtime_ready(true);
}
(true, RelayRouteMode::Middle, None)
} else if fallback_enabled && !ready_observed {
(
true,
RelayRouteMode::Direct,
Some("startup_direct_fallback"),
)
} else if fast_fallback_enabled {
(
true,
@@ -122,7 +146,14 @@ pub(crate) async fn configure_admission_gate(
);
} else {
let fallback_reason = next_fallback_reason.unwrap_or("unknown");
if fallback_reason == "strict_grace_fallback" {
if fallback_reason == "startup_direct_fallback" {
warn!(
target_mode = route_mode.as_str(),
cutover_generation = snapshot.generation,
fallback_reason,
"ME pool not-ready during startup; routing new sessions via Direct-DC"
);
} else if fallback_reason == "strict_grace_fallback" {
let fallback_after = if ready_observed {
RUNTIME_FALLBACK_AFTER
} else {

View File

@@ -6,7 +6,7 @@ use std::time::Duration;
use tokio::net::TcpListener;
#[cfg(unix)]
use tokio::net::UnixListener;
use tokio::sync::{Semaphore, watch};
use tokio::sync::{RwLock, Semaphore, watch};
use tracing::{debug, error, info, warn};
use crate::config::{ProxyConfig, RstOnCloseMode};
@@ -63,6 +63,7 @@ pub(crate) async fn bind_listeners(
buffer_pool: Arc<BufferPool>,
rng: Arc<SecureRandom>,
me_pool: Option<Arc<MePool>>,
me_pool_runtime: Arc<RwLock<Option<Arc<MePool>>>>,
route_runtime: Arc<RouteRuntimeController>,
tls_cache: Option<Arc<TlsFrontCache>>,
ip_tracker: Arc<UserIpTracker>,
@@ -236,6 +237,7 @@ pub(crate) async fn bind_listeners(
let buffer_pool = buffer_pool.clone();
let rng = rng.clone();
let me_pool = me_pool.clone();
let me_pool_runtime = me_pool_runtime.clone();
let route_runtime = route_runtime.clone();
let tls_cache = tls_cache.clone();
let ip_tracker = ip_tracker.clone();
@@ -298,6 +300,7 @@ pub(crate) async fn bind_listeners(
let buffer_pool = buffer_pool.clone();
let rng = rng.clone();
let me_pool = me_pool.clone();
let me_pool_runtime = me_pool_runtime.clone();
let route_runtime = route_runtime.clone();
let tls_cache = tls_cache.clone();
let ip_tracker = ip_tracker.clone();
@@ -307,7 +310,8 @@ pub(crate) async fn bind_listeners(
tokio::spawn(async move {
let _permit = permit;
if let Err(e) = crate::proxy::client::handle_client_stream_with_shared(
if let Err(e) =
crate::proxy::client::handle_client_stream_with_shared_and_pool_runtime(
stream,
fake_peer,
config,
@@ -317,6 +321,7 @@ pub(crate) async fn bind_listeners(
buffer_pool,
rng,
me_pool,
Some(me_pool_runtime),
route_runtime,
tls_cache,
ip_tracker,
@@ -367,6 +372,7 @@ pub(crate) fn spawn_tcp_accept_loops(
buffer_pool: Arc<BufferPool>,
rng: Arc<SecureRandom>,
me_pool: Option<Arc<MePool>>,
me_pool_runtime: Arc<RwLock<Option<Arc<MePool>>>>,
route_runtime: Arc<RouteRuntimeController>,
tls_cache: Option<Arc<TlsFrontCache>>,
ip_tracker: Arc<UserIpTracker>,
@@ -383,6 +389,7 @@ pub(crate) fn spawn_tcp_accept_loops(
let buffer_pool = buffer_pool.clone();
let rng = rng.clone();
let me_pool = me_pool.clone();
let me_pool_runtime = me_pool_runtime.clone();
let route_runtime = route_runtime.clone();
let tls_cache = tls_cache.clone();
let ip_tracker = ip_tracker.clone();
@@ -449,6 +456,7 @@ pub(crate) fn spawn_tcp_accept_loops(
let buffer_pool = buffer_pool.clone();
let rng = rng.clone();
let me_pool = me_pool.clone();
let me_pool_runtime = me_pool_runtime.clone();
let route_runtime = route_runtime.clone();
let tls_cache = tls_cache.clone();
let ip_tracker = ip_tracker.clone();
@@ -470,6 +478,7 @@ pub(crate) fn spawn_tcp_accept_loops(
buffer_pool,
rng,
me_pool,
Some(me_pool_runtime),
route_runtime,
tls_cache,
ip_tracker,

View File

@@ -36,10 +36,10 @@ use crate::network::probe::{decide_network_capabilities, log_probe_result, run_p
use crate::proxy::route_mode::{RelayRouteMode, RouteRuntimeController};
use crate::proxy::shared_state::ProxySharedState;
use crate::startup::{
COMPONENT_API_BOOTSTRAP, COMPONENT_CONFIG_LOAD, COMPONENT_ME_POOL_CONSTRUCT,
COMPONENT_ME_POOL_INIT_STAGE1, COMPONENT_ME_PROXY_CONFIG_V4, COMPONENT_ME_PROXY_CONFIG_V6,
COMPONENT_ME_SECRET_FETCH, COMPONENT_NETWORK_PROBE, COMPONENT_TRACING_INIT, StartupMeStatus,
StartupTracker,
COMPONENT_API_BOOTSTRAP, COMPONENT_CONFIG_LOAD, COMPONENT_DC_CONNECTIVITY_PING,
COMPONENT_ME_CONNECTIVITY_PING, COMPONENT_ME_POOL_CONSTRUCT, COMPONENT_ME_POOL_INIT_STAGE1,
COMPONENT_ME_PROXY_CONFIG_V4, COMPONENT_ME_PROXY_CONFIG_V6, COMPONENT_ME_SECRET_FETCH,
COMPONENT_NETWORK_PROBE, COMPONENT_TRACING_INIT, StartupMeStatus, StartupTracker,
};
use crate::stats::beobachten::BeobachtenStore;
use crate::stats::telemetry::TelemetryPolicy;
@@ -461,12 +461,14 @@ async fn run_telemt_core(
let (api_config_tx, api_config_rx) = watch::channel(Arc::new(config.clone()));
let (detected_ips_tx, detected_ips_rx) = watch::channel((None::<IpAddr>, None::<IpAddr>));
let initial_admission_open = !config.general.use_middle_proxy;
let initial_direct_first =
config.general.use_middle_proxy && config.general.me2dc_fallback;
let initial_admission_open = !config.general.use_middle_proxy || initial_direct_first;
let (admission_tx, admission_rx) = watch::channel(initial_admission_open);
let initial_route_mode = if config.general.use_middle_proxy {
RelayRouteMode::Middle
} else {
let initial_route_mode = if !config.general.use_middle_proxy || initial_direct_first {
RelayRouteMode::Direct
} else {
RelayRouteMode::Middle
};
let route_runtime = Arc::new(RouteRuntimeController::new(initial_route_mode));
let api_me_pool = Arc::new(RwLock::new(None::<Arc<MePool>>));
@@ -602,8 +604,9 @@ async fn run_telemt_core(
let me_init_retry_attempts = config.general.me_init_retry_attempts;
if use_middle_proxy && !decision.ipv4_me && !decision.ipv6_me {
if me2dc_fallback {
warn!("No usable IP family for Middle Proxy detected; falling back to direct DC");
use_middle_proxy = false;
warn!(
"No usable IP family for Middle Proxy detected; Direct-DC startup fallback is active while ME init retries continue"
);
} else {
warn!(
"No usable IP family for Middle Proxy detected; me2dc_fallback=false, ME init retries stay active"
@@ -665,23 +668,32 @@ async fn run_telemt_core(
}
let (me_ready_tx, me_ready_rx) = watch::channel(0_u64);
let direct_first_startup = use_middle_proxy && me2dc_fallback;
let me_pool: Option<Arc<MePool>> = me_startup::initialize_me_pool(
use_middle_proxy,
&config,
&decision,
&probe,
&startup_tracker,
upstream_manager.clone(),
rng.clone(),
stats.clone(),
api_me_pool.clone(),
me_ready_tx.clone(),
)
.await;
let me_pool: Option<Arc<MePool>> = if direct_first_startup {
None
} else {
me_startup::initialize_me_pool(
use_middle_proxy,
&config,
&decision,
&probe,
&startup_tracker,
upstream_manager.clone(),
rng.clone(),
stats.clone(),
api_me_pool.clone(),
me_ready_tx.clone(),
)
.await
};
// If ME failed to initialize, force direct-only mode.
if me_pool.is_some() {
if direct_first_startup {
startup_tracker.set_transport_mode("direct").await;
startup_tracker.set_degraded(true).await;
info!("Transport: Direct DC startup fallback active; Middle-End bootstrap continues in background");
} else if me_pool.is_some() {
startup_tracker.set_transport_mode("middle_proxy").await;
startup_tracker.set_degraded(false).await;
info!("Transport: Middle-End Proxy - all DC-over-RPC");
@@ -719,18 +731,33 @@ async fn run_telemt_core(
config.access.cidr_rate_limits.clone(),
);
connectivity::run_startup_connectivity(
&config,
&me_pool,
rng.clone(),
&startup_tracker,
upstream_manager.clone(),
prefer_ipv6,
&decision,
process_started_at,
api_me_pool.clone(),
)
.await;
if direct_first_startup {
startup_tracker
.skip_component(
COMPONENT_ME_CONNECTIVITY_PING,
Some("deferred by direct-first startup".to_string()),
)
.await;
startup_tracker
.skip_component(
COMPONENT_DC_CONNECTIVITY_PING,
Some("background health checks active".to_string()),
)
.await;
} else {
connectivity::run_startup_connectivity(
&config,
&me_pool,
rng.clone(),
&startup_tracker,
upstream_manager.clone(),
prefer_ipv6,
&decision,
process_started_at,
api_me_pool.clone(),
)
.await;
}
let runtime_watches = runtime_tasks::spawn_runtime_tasks(
&config,
@@ -758,9 +785,70 @@ async fn run_telemt_core(
let detected_ip_v4 = runtime_watches.detected_ip_v4;
let detected_ip_v6 = runtime_watches.detected_ip_v6;
if direct_first_startup {
let config_bg = config.clone();
let decision_bg = decision.clone();
let probe_bg = probe.clone();
let startup_tracker_bg = startup_tracker.clone();
let upstream_manager_bg = upstream_manager.clone();
let rng_bg = rng.clone();
let stats_bg = stats.clone();
let api_me_pool_bg = api_me_pool.clone();
let me_ready_tx_bg = me_ready_tx.clone();
let config_rx_bg = config_rx.clone();
tokio::spawn(async move {
let mut bootstrap_attempt: u32 = 0;
loop {
bootstrap_attempt = bootstrap_attempt.saturating_add(1);
let pool = me_startup::initialize_me_pool(
true,
config_bg.as_ref(),
&decision_bg,
&probe_bg,
&startup_tracker_bg,
upstream_manager_bg.clone(),
rng_bg.clone(),
stats_bg.clone(),
api_me_pool_bg.clone(),
me_ready_tx_bg.clone(),
)
.await;
if let Some(pool) = pool {
runtime_tasks::spawn_middle_proxy_runtime_tasks(
config_bg.as_ref(),
config_rx_bg,
pool,
rng_bg,
me_ready_tx_bg,
);
break;
}
if me_init_retry_attempts > 0 && bootstrap_attempt >= me_init_retry_attempts {
break;
}
tokio::time::sleep(Duration::from_secs(2)).await;
}
});
let startup_tracker_ready = startup_tracker.clone();
let api_me_pool_ready = api_me_pool.clone();
let mut me_ready_rx_transport = me_ready_tx.subscribe();
tokio::spawn(async move {
if me_ready_rx_transport.changed().await.is_ok() {
if let Some(pool) = api_me_pool_ready.read().await.as_ref() {
pool.set_runtime_ready(true);
}
startup_tracker_ready.set_transport_mode("middle_proxy").await;
startup_tracker_ready.set_degraded(false).await;
info!("Transport: Middle-End Proxy restored for new sessions");
}
});
}
admission::configure_admission_gate(
&config,
me_pool.clone(),
api_me_pool.clone(),
route_runtime.clone(),
&admission_tx,
config_rx.clone(),
@@ -789,6 +877,7 @@ async fn run_telemt_core(
buffer_pool.clone(),
rng.clone(),
me_pool.clone(),
api_me_pool.clone(),
route_runtime.clone(),
tls_cache.clone(),
ip_tracker.clone(),
@@ -843,6 +932,7 @@ async fn run_telemt_core(
buffer_pool.clone(),
rng.clone(),
me_pool.clone(),
api_me_pool.clone(),
route_runtime.clone(),
tls_cache.clone(),
ip_tracker.clone(),

View File

@@ -257,45 +257,7 @@ pub(crate) async fn spawn_runtime_tasks(
});
if let Some(pool) = me_pool {
let reinit_trigger_capacity = config.general.me_reinit_trigger_channel.max(1);
let (reinit_tx, reinit_rx) = mpsc::channel::<MeReinitTrigger>(reinit_trigger_capacity);
let pool_clone_sched = pool.clone();
let rng_clone_sched = rng.clone();
let config_rx_clone_sched = config_rx.clone();
let me_ready_tx_sched = me_ready_tx.clone();
tokio::spawn(async move {
crate::transport::middle_proxy::me_reinit_scheduler(
pool_clone_sched,
rng_clone_sched,
config_rx_clone_sched,
reinit_rx,
me_ready_tx_sched,
)
.await;
});
let pool_clone = pool.clone();
let config_rx_clone = config_rx.clone();
let reinit_tx_updater = reinit_tx.clone();
tokio::spawn(async move {
crate::transport::middle_proxy::me_config_updater(
pool_clone,
config_rx_clone,
reinit_tx_updater,
)
.await;
});
let config_rx_clone_rot = config_rx.clone();
let reinit_tx_rotation = reinit_tx.clone();
tokio::spawn(async move {
crate::transport::middle_proxy::me_rotation_task(
config_rx_clone_rot,
reinit_tx_rotation,
)
.await;
});
spawn_middle_proxy_runtime_tasks(config, config_rx.clone(), pool, rng, me_ready_tx);
}
RuntimeWatches {
@@ -306,6 +268,51 @@ pub(crate) async fn spawn_runtime_tasks(
}
}
pub(crate) fn spawn_middle_proxy_runtime_tasks(
config: &ProxyConfig,
config_rx: watch::Receiver<Arc<ProxyConfig>>,
pool: Arc<MePool>,
rng: Arc<SecureRandom>,
me_ready_tx: watch::Sender<u64>,
) {
let reinit_trigger_capacity = config.general.me_reinit_trigger_channel.max(1);
let (reinit_tx, reinit_rx) = mpsc::channel::<MeReinitTrigger>(reinit_trigger_capacity);
let pool_clone_sched = pool.clone();
let rng_clone_sched = rng.clone();
let config_rx_clone_sched = config_rx.clone();
let me_ready_tx_sched = me_ready_tx.clone();
tokio::spawn(async move {
crate::transport::middle_proxy::me_reinit_scheduler(
pool_clone_sched,
rng_clone_sched,
config_rx_clone_sched,
reinit_rx,
me_ready_tx_sched,
)
.await;
});
let pool_clone = pool.clone();
let config_rx_clone = config_rx.clone();
let reinit_tx_updater = reinit_tx.clone();
tokio::spawn(async move {
crate::transport::middle_proxy::me_config_updater(
pool_clone,
config_rx_clone,
reinit_tx_updater,
)
.await;
});
let config_rx_clone_rot = config_rx.clone();
let reinit_tx_rotation = reinit_tx.clone();
tokio::spawn(async move {
crate::transport::middle_proxy::me_rotation_task(config_rx_clone_rot, reinit_tx_rotation)
.await;
});
}
pub(crate) async fn apply_runtime_log_filter(
has_rust_log: bool,
effective_log_level: &LogLevel,