diff --git a/src/api/mod.rs b/src/api/mod.rs index a705a46..63fafad 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -11,11 +11,12 @@ use hyper::server::conn::http1; use hyper::service::service_fn; use hyper::{Method, Request, Response, StatusCode}; use tokio::net::TcpListener; -use tokio::sync::{Mutex, watch}; +use tokio::sync::{Mutex, RwLock, watch}; use tracing::{debug, info, warn}; use crate::config::ProxyConfig; use crate::ip_tracker::UserIpTracker; +use crate::startup::StartupTracker; use crate::stats::Stats; use crate::transport::middle_proxy::MePool; use crate::transport::UpstreamManager; @@ -25,6 +26,7 @@ mod events; mod http_utils; mod model; mod runtime_edge; +mod runtime_init; mod runtime_min; mod runtime_stats; mod runtime_watch; @@ -41,6 +43,7 @@ use runtime_edge::{ EdgeConnectionsCacheEntry, build_runtime_connections_summary_data, build_runtime_events_recent_data, }; +use runtime_init::build_runtime_initialization_data; use runtime_min::{ build_runtime_me_pool_state_data, build_runtime_me_quality_data, build_runtime_nat_stun_data, build_runtime_upstream_quality_data, build_security_whitelist_data, @@ -67,7 +70,7 @@ pub(super) struct ApiRuntimeState { pub(super) struct ApiShared { pub(super) stats: Arc, pub(super) ip_tracker: Arc, - pub(super) me_pool: Option>, + pub(super) me_pool: Arc>>>, pub(super) upstream_manager: Arc, pub(super) config_path: PathBuf, pub(super) startup_detected_ip_v4: Option, @@ -79,6 +82,7 @@ pub(super) struct ApiShared { pub(super) runtime_events: Arc, pub(super) request_id: Arc, pub(super) runtime_state: Arc, + pub(super) startup_tracker: Arc, } impl ApiShared { @@ -91,7 +95,7 @@ pub async fn serve( listen: SocketAddr, stats: Arc, ip_tracker: Arc, - me_pool: Option>, + me_pool: Arc>>>, upstream_manager: Arc, config_rx: watch::Receiver>, admission_rx: watch::Receiver, @@ -99,6 +103,7 @@ pub async fn serve( startup_detected_ip_v4: Option, startup_detected_ip_v6: Option, process_started_at_epoch_secs: u64, + startup_tracker: Arc, ) { let listener = match TcpListener::bind(listen).await { Ok(listener) => listener, @@ -138,6 +143,7 @@ pub async fn serve( )), request_id: Arc::new(AtomicU64::new(1)), runtime_state: runtime_state.clone(), + startup_tracker, }); spawn_runtime_watchers( @@ -248,7 +254,12 @@ async fn handle( } ("GET", "/v1/runtime/gates") => { let revision = current_revision(&shared.config_path).await?; - let data = build_runtime_gates_data(shared.as_ref(), cfg.as_ref()); + let data = build_runtime_gates_data(shared.as_ref(), cfg.as_ref()).await; + Ok(success_response(StatusCode::OK, data, revision)) + } + ("GET", "/v1/runtime/initialization") => { + let revision = current_revision(&shared.config_path).await?; + let data = build_runtime_initialization_data(shared.as_ref()).await; Ok(success_response(StatusCode::OK, data, revision)) } ("GET", "/v1/limits/effective") => { diff --git a/src/api/model.rs b/src/api/model.rs index 88c6ddc..fd678f6 100644 --- a/src/api/model.rs +++ b/src/api/model.rs @@ -318,11 +318,21 @@ pub(super) struct MinimalMeRuntimeData { pub(super) adaptive_floor_cpu_cores_override: u16, pub(super) adaptive_floor_max_extra_writers_single_per_core: u16, pub(super) adaptive_floor_max_extra_writers_multi_per_core: u16, + pub(super) adaptive_floor_max_active_writers_per_core: u16, + pub(super) adaptive_floor_max_warm_writers_per_core: u16, + pub(super) adaptive_floor_max_active_writers_global: u32, + pub(super) adaptive_floor_max_warm_writers_global: u32, pub(super) adaptive_floor_cpu_cores_detected: u32, pub(super) adaptive_floor_cpu_cores_effective: u32, pub(super) adaptive_floor_global_cap_raw: u64, pub(super) adaptive_floor_global_cap_effective: u64, pub(super) adaptive_floor_target_writers_total: u64, + pub(super) adaptive_floor_active_cap_configured: u64, + pub(super) adaptive_floor_active_cap_effective: u64, + pub(super) adaptive_floor_warm_cap_configured: u64, + pub(super) adaptive_floor_warm_cap_effective: u64, + pub(super) adaptive_floor_active_writers_current: u64, + pub(super) adaptive_floor_warm_writers_current: u64, pub(super) me_keepalive_enabled: bool, pub(super) me_keepalive_interval_secs: u64, pub(super) me_keepalive_jitter_secs: u64, diff --git a/src/api/runtime_init.rs b/src/api/runtime_init.rs new file mode 100644 index 0000000..4bd8943 --- /dev/null +++ b/src/api/runtime_init.rs @@ -0,0 +1,186 @@ +use serde::Serialize; + +use crate::startup::{ + COMPONENT_ME_CONNECTIVITY_PING, COMPONENT_ME_POOL_CONSTRUCT, COMPONENT_ME_POOL_INIT_STAGE1, + COMPONENT_ME_PROXY_CONFIG_V4, COMPONENT_ME_PROXY_CONFIG_V6, COMPONENT_ME_SECRET_FETCH, + StartupComponentStatus, StartupMeStatus, compute_progress_pct, +}; + +use super::ApiShared; + +#[derive(Serialize)] +pub(super) struct RuntimeInitializationComponentData { + pub(super) id: &'static str, + pub(super) title: &'static str, + pub(super) status: &'static str, + pub(super) started_at_epoch_ms: Option, + pub(super) finished_at_epoch_ms: Option, + pub(super) duration_ms: Option, + pub(super) attempts: u32, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) details: Option, +} + +#[derive(Serialize)] +pub(super) struct RuntimeInitializationMeData { + pub(super) status: &'static str, + pub(super) current_stage: String, + pub(super) progress_pct: f64, + pub(super) init_attempt: u32, + pub(super) retry_limit: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) last_error: Option, +} + +#[derive(Serialize)] +pub(super) struct RuntimeInitializationData { + pub(super) status: &'static str, + pub(super) degraded: bool, + pub(super) current_stage: String, + pub(super) progress_pct: f64, + pub(super) started_at_epoch_secs: u64, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) ready_at_epoch_secs: Option, + pub(super) total_elapsed_ms: u64, + pub(super) transport_mode: String, + pub(super) me: RuntimeInitializationMeData, + pub(super) components: Vec, +} + +#[derive(Clone)] +pub(super) struct RuntimeStartupSummaryData { + pub(super) status: &'static str, + pub(super) stage: String, + pub(super) progress_pct: f64, +} + +pub(super) async fn build_runtime_startup_summary(shared: &ApiShared) -> RuntimeStartupSummaryData { + let snapshot = shared.startup_tracker.snapshot().await; + let me_pool_progress = current_me_pool_stage_progress(shared).await; + let progress_pct = compute_progress_pct(&snapshot, me_pool_progress); + RuntimeStartupSummaryData { + status: snapshot.status.as_str(), + stage: snapshot.current_stage, + progress_pct, + } +} + +pub(super) async fn build_runtime_initialization_data( + shared: &ApiShared, +) -> RuntimeInitializationData { + let snapshot = shared.startup_tracker.snapshot().await; + let me_pool_progress = current_me_pool_stage_progress(shared).await; + let progress_pct = compute_progress_pct(&snapshot, me_pool_progress); + let me_progress_pct = compute_me_progress_pct(&snapshot, me_pool_progress); + + RuntimeInitializationData { + status: snapshot.status.as_str(), + degraded: snapshot.degraded, + current_stage: snapshot.current_stage, + progress_pct, + started_at_epoch_secs: snapshot.started_at_epoch_secs, + ready_at_epoch_secs: snapshot.ready_at_epoch_secs, + total_elapsed_ms: snapshot.total_elapsed_ms, + transport_mode: snapshot.transport_mode, + me: RuntimeInitializationMeData { + status: snapshot.me.status.as_str(), + current_stage: snapshot.me.current_stage, + progress_pct: me_progress_pct, + init_attempt: snapshot.me.init_attempt, + retry_limit: snapshot.me.retry_limit, + last_error: snapshot.me.last_error, + }, + components: snapshot + .components + .into_iter() + .map(|component| RuntimeInitializationComponentData { + id: component.id, + title: component.title, + status: component.status.as_str(), + started_at_epoch_ms: component.started_at_epoch_ms, + finished_at_epoch_ms: component.finished_at_epoch_ms, + duration_ms: component.duration_ms, + attempts: component.attempts, + details: component.details, + }) + .collect(), + } +} + +fn compute_me_progress_pct( + snapshot: &crate::startup::StartupSnapshot, + me_pool_progress: Option, +) -> f64 { + match snapshot.me.status { + StartupMeStatus::Pending => 0.0, + StartupMeStatus::Ready | StartupMeStatus::Failed | StartupMeStatus::Skipped => 100.0, + StartupMeStatus::Initializing => { + let mut total_weight = 0.0f64; + let mut completed_weight = 0.0f64; + for component in &snapshot.components { + if !is_me_component(component.id) { + continue; + } + total_weight += component.weight; + let unit_progress = match component.status { + StartupComponentStatus::Pending => 0.0, + StartupComponentStatus::Running => { + if component.id == COMPONENT_ME_POOL_INIT_STAGE1 { + me_pool_progress.unwrap_or(0.0).clamp(0.0, 1.0) + } else { + 0.0 + } + } + StartupComponentStatus::Ready + | StartupComponentStatus::Failed + | StartupComponentStatus::Skipped => 1.0, + }; + completed_weight += component.weight * unit_progress; + } + if total_weight <= f64::EPSILON { + 0.0 + } else { + ((completed_weight / total_weight) * 100.0).clamp(0.0, 100.0) + } + } + } +} + +fn is_me_component(component_id: &str) -> bool { + matches!( + component_id, + COMPONENT_ME_SECRET_FETCH + | COMPONENT_ME_PROXY_CONFIG_V4 + | COMPONENT_ME_PROXY_CONFIG_V6 + | COMPONENT_ME_POOL_CONSTRUCT + | COMPONENT_ME_POOL_INIT_STAGE1 + | COMPONENT_ME_CONNECTIVITY_PING + ) +} + +async fn current_me_pool_stage_progress(shared: &ApiShared) -> Option { + let snapshot = shared.startup_tracker.snapshot().await; + if snapshot.me.status != StartupMeStatus::Initializing { + return None; + } + + let pool = shared.me_pool.read().await.clone()?; + let status = pool.api_status_snapshot().await; + let configured_dc_groups = status.configured_dc_groups; + let covered_dc_groups = status + .dcs + .iter() + .filter(|dc| dc.alive_writers > 0) + .count(); + + let dc_coverage = ratio_01(covered_dc_groups, configured_dc_groups); + let writer_coverage = ratio_01(status.alive_writers, status.required_writers); + Some((0.7 * dc_coverage + 0.3 * writer_coverage).clamp(0.0, 1.0)) +} + +fn ratio_01(part: usize, total: usize) -> f64 { + if total == 0 { + return 0.0; + } + ((part as f64) / (total as f64)).clamp(0.0, 1.0) +} diff --git a/src/api/runtime_min.rs b/src/api/runtime_min.rs index 96270df..d3066a3 100644 --- a/src/api/runtime_min.rs +++ b/src/api/runtime_min.rs @@ -260,7 +260,7 @@ pub(super) fn build_security_whitelist_data(cfg: &ProxyConfig) -> SecurityWhitel pub(super) async fn build_runtime_me_pool_state_data(shared: &ApiShared) -> RuntimeMePoolStateData { let now_epoch_secs = now_epoch_secs(); - let Some(pool) = &shared.me_pool else { + let Some(pool) = shared.me_pool.read().await.clone() else { return RuntimeMePoolStateData { enabled: false, reason: Some(SOURCE_UNAVAILABLE_REASON), @@ -350,7 +350,7 @@ pub(super) async fn build_runtime_me_pool_state_data(shared: &ApiShared) -> Runt pub(super) async fn build_runtime_me_quality_data(shared: &ApiShared) -> RuntimeMeQualityData { let now_epoch_secs = now_epoch_secs(); - let Some(pool) = &shared.me_pool else { + let Some(pool) = shared.me_pool.read().await.clone() else { return RuntimeMeQualityData { enabled: false, reason: Some(SOURCE_UNAVAILABLE_REASON), @@ -486,7 +486,7 @@ pub(super) async fn build_runtime_upstream_quality_data( pub(super) async fn build_runtime_nat_stun_data(shared: &ApiShared) -> RuntimeNatStunData { let now_epoch_secs = now_epoch_secs(); - let Some(pool) = &shared.me_pool else { + let Some(pool) = shared.me_pool.read().await.clone() else { return RuntimeNatStunData { enabled: false, reason: Some(SOURCE_UNAVAILABLE_REASON), diff --git a/src/api/runtime_stats.rs b/src/api/runtime_stats.rs index c69f817..f90abe3 100644 --- a/src/api/runtime_stats.rs +++ b/src/api/runtime_stats.rs @@ -297,7 +297,7 @@ async fn get_minimal_payload_cached( } } - let pool = shared.me_pool.as_ref()?; + let pool = shared.me_pool.read().await.clone()?; let status = pool.api_status_snapshot().await; let runtime = pool.api_runtime_snapshot().await; let generated_at_epoch_secs = status.generated_at_epoch_secs; @@ -380,11 +380,25 @@ async fn get_minimal_payload_cached( .adaptive_floor_max_extra_writers_single_per_core, adaptive_floor_max_extra_writers_multi_per_core: runtime .adaptive_floor_max_extra_writers_multi_per_core, + adaptive_floor_max_active_writers_per_core: runtime + .adaptive_floor_max_active_writers_per_core, + adaptive_floor_max_warm_writers_per_core: runtime + .adaptive_floor_max_warm_writers_per_core, + adaptive_floor_max_active_writers_global: runtime + .adaptive_floor_max_active_writers_global, + adaptive_floor_max_warm_writers_global: runtime + .adaptive_floor_max_warm_writers_global, adaptive_floor_cpu_cores_detected: runtime.adaptive_floor_cpu_cores_detected, adaptive_floor_cpu_cores_effective: runtime.adaptive_floor_cpu_cores_effective, adaptive_floor_global_cap_raw: runtime.adaptive_floor_global_cap_raw, adaptive_floor_global_cap_effective: runtime.adaptive_floor_global_cap_effective, adaptive_floor_target_writers_total: runtime.adaptive_floor_target_writers_total, + adaptive_floor_active_cap_configured: runtime.adaptive_floor_active_cap_configured, + adaptive_floor_active_cap_effective: runtime.adaptive_floor_active_cap_effective, + adaptive_floor_warm_cap_configured: runtime.adaptive_floor_warm_cap_configured, + adaptive_floor_warm_cap_effective: runtime.adaptive_floor_warm_cap_effective, + adaptive_floor_active_writers_current: runtime.adaptive_floor_active_writers_current, + adaptive_floor_warm_writers_current: runtime.adaptive_floor_warm_writers_current, me_keepalive_enabled: runtime.me_keepalive_enabled, me_keepalive_interval_secs: runtime.me_keepalive_interval_secs, me_keepalive_jitter_secs: runtime.me_keepalive_jitter_secs, diff --git a/src/api/runtime_zero.rs b/src/api/runtime_zero.rs index 61b6844..5020705 100644 --- a/src/api/runtime_zero.rs +++ b/src/api/runtime_zero.rs @@ -5,6 +5,7 @@ use serde::Serialize; use crate::config::{MeFloorMode, ProxyConfig, UserMaxUniqueIpsMode}; use super::ApiShared; +use super::runtime_init::build_runtime_startup_summary; #[derive(Serialize)] pub(super) struct SystemInfoData { @@ -34,6 +35,9 @@ pub(super) struct RuntimeGatesData { pub(super) me_runtime_ready: bool, pub(super) me2dc_fallback_enabled: bool, pub(super) use_middle_proxy: bool, + pub(super) startup_status: &'static str, + pub(super) startup_stage: String, + pub(super) startup_progress_pct: f64, } #[derive(Serialize)] @@ -66,6 +70,10 @@ pub(super) struct EffectiveMiddleProxyLimits { pub(super) adaptive_floor_cpu_cores_override: u16, pub(super) adaptive_floor_max_extra_writers_single_per_core: u16, pub(super) adaptive_floor_max_extra_writers_multi_per_core: u16, + pub(super) adaptive_floor_max_active_writers_per_core: u16, + pub(super) adaptive_floor_max_warm_writers_per_core: u16, + pub(super) adaptive_floor_max_active_writers_global: u32, + pub(super) adaptive_floor_max_warm_writers_global: u32, pub(super) reconnect_max_concurrent_per_dc: u32, pub(super) reconnect_backoff_base_ms: u64, pub(super) reconnect_backoff_cap_ms: u64, @@ -142,12 +150,18 @@ pub(super) fn build_system_info_data( } } -pub(super) fn build_runtime_gates_data(shared: &ApiShared, cfg: &ProxyConfig) -> RuntimeGatesData { +pub(super) async fn build_runtime_gates_data( + shared: &ApiShared, + cfg: &ProxyConfig, +) -> RuntimeGatesData { + let startup_summary = build_runtime_startup_summary(shared).await; let me_runtime_ready = if !cfg.general.use_middle_proxy { true } else { shared .me_pool + .read() + .await .as_ref() .map(|pool| pool.is_runtime_ready()) .unwrap_or(false) @@ -159,6 +173,9 @@ pub(super) fn build_runtime_gates_data(shared: &ApiShared, cfg: &ProxyConfig) -> me_runtime_ready, me2dc_fallback_enabled: cfg.general.me2dc_fallback, use_middle_proxy: cfg.general.use_middle_proxy, + startup_status: startup_summary.status, + startup_stage: startup_summary.stage, + startup_progress_pct: startup_summary.progress_pct, } } @@ -204,6 +221,18 @@ pub(super) fn build_limits_effective_data(cfg: &ProxyConfig) -> EffectiveLimitsD adaptive_floor_max_extra_writers_multi_per_core: cfg .general .me_adaptive_floor_max_extra_writers_multi_per_core, + adaptive_floor_max_active_writers_per_core: cfg + .general + .me_adaptive_floor_max_active_writers_per_core, + adaptive_floor_max_warm_writers_per_core: cfg + .general + .me_adaptive_floor_max_warm_writers_per_core, + adaptive_floor_max_active_writers_global: cfg + .general + .me_adaptive_floor_max_active_writers_global, + adaptive_floor_max_warm_writers_global: cfg + .general + .me_adaptive_floor_max_warm_writers_global, reconnect_max_concurrent_per_dc: cfg.general.me_reconnect_max_concurrent_per_dc, reconnect_backoff_base_ms: cfg.general.me_reconnect_backoff_base_ms, reconnect_backoff_cap_ms: cfg.general.me_reconnect_backoff_cap_ms, diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 465cef1..ce55394 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -17,6 +17,10 @@ const DEFAULT_ME_ADAPTIVE_FLOOR_WRITERS_PER_CORE_TOTAL: u16 = 48; const DEFAULT_ME_ADAPTIVE_FLOOR_CPU_CORES_OVERRIDE: u16 = 0; const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_EXTRA_WRITERS_SINGLE_PER_CORE: u16 = 1; const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_EXTRA_WRITERS_MULTI_PER_CORE: u16 = 2; +const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_ACTIVE_WRITERS_PER_CORE: u16 = 64; +const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_WARM_WRITERS_PER_CORE: u16 = 64; +const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_ACTIVE_WRITERS_GLOBAL: u32 = 256; +const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_WARM_WRITERS_GLOBAL: u32 = 256; const DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS: u64 = 30; const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 2; const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 5; @@ -276,6 +280,22 @@ pub(crate) fn default_me_adaptive_floor_max_extra_writers_multi_per_core() -> u1 DEFAULT_ME_ADAPTIVE_FLOOR_MAX_EXTRA_WRITERS_MULTI_PER_CORE } +pub(crate) fn default_me_adaptive_floor_max_active_writers_per_core() -> u16 { + DEFAULT_ME_ADAPTIVE_FLOOR_MAX_ACTIVE_WRITERS_PER_CORE +} + +pub(crate) fn default_me_adaptive_floor_max_warm_writers_per_core() -> u16 { + DEFAULT_ME_ADAPTIVE_FLOOR_MAX_WARM_WRITERS_PER_CORE +} + +pub(crate) fn default_me_adaptive_floor_max_active_writers_global() -> u32 { + DEFAULT_ME_ADAPTIVE_FLOOR_MAX_ACTIVE_WRITERS_GLOBAL +} + +pub(crate) fn default_me_adaptive_floor_max_warm_writers_global() -> u32 { + DEFAULT_ME_ADAPTIVE_FLOOR_MAX_WARM_WRITERS_GLOBAL +} + pub(crate) fn default_upstream_connect_retry_attempts() -> u32 { DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS } diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index a24f9d5..a7ae60c 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -84,6 +84,10 @@ pub struct HotFields { pub me_adaptive_floor_cpu_cores_override: u16, pub me_adaptive_floor_max_extra_writers_single_per_core: u16, pub me_adaptive_floor_max_extra_writers_multi_per_core: u16, + pub me_adaptive_floor_max_active_writers_per_core: u16, + pub me_adaptive_floor_max_warm_writers_per_core: u16, + pub me_adaptive_floor_max_active_writers_global: u32, + pub me_adaptive_floor_max_warm_writers_global: u32, pub me_route_backpressure_base_timeout_ms: u64, pub me_route_backpressure_high_timeout_ms: u64, pub me_route_backpressure_high_watermark_pct: u8, @@ -173,6 +177,18 @@ impl HotFields { me_adaptive_floor_max_extra_writers_multi_per_core: cfg .general .me_adaptive_floor_max_extra_writers_multi_per_core, + me_adaptive_floor_max_active_writers_per_core: cfg + .general + .me_adaptive_floor_max_active_writers_per_core, + me_adaptive_floor_max_warm_writers_per_core: cfg + .general + .me_adaptive_floor_max_warm_writers_per_core, + me_adaptive_floor_max_active_writers_global: cfg + .general + .me_adaptive_floor_max_active_writers_global, + me_adaptive_floor_max_warm_writers_global: cfg + .general + .me_adaptive_floor_max_warm_writers_global, me_route_backpressure_base_timeout_ms: cfg.general.me_route_backpressure_base_timeout_ms, me_route_backpressure_high_timeout_ms: cfg.general.me_route_backpressure_high_timeout_ms, me_route_backpressure_high_watermark_pct: cfg.general.me_route_backpressure_high_watermark_pct, @@ -305,6 +321,14 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig { new.general.me_adaptive_floor_max_extra_writers_single_per_core; cfg.general.me_adaptive_floor_max_extra_writers_multi_per_core = new.general.me_adaptive_floor_max_extra_writers_multi_per_core; + cfg.general.me_adaptive_floor_max_active_writers_per_core = + new.general.me_adaptive_floor_max_active_writers_per_core; + cfg.general.me_adaptive_floor_max_warm_writers_per_core = + new.general.me_adaptive_floor_max_warm_writers_per_core; + cfg.general.me_adaptive_floor_max_active_writers_global = + new.general.me_adaptive_floor_max_active_writers_global; + cfg.general.me_adaptive_floor_max_warm_writers_global = + new.general.me_adaptive_floor_max_warm_writers_global; cfg.general.me_route_backpressure_base_timeout_ms = new.general.me_route_backpressure_base_timeout_ms; cfg.general.me_route_backpressure_high_timeout_ms = @@ -739,9 +763,17 @@ fn log_changes( != new_hot.me_adaptive_floor_max_extra_writers_single_per_core || old_hot.me_adaptive_floor_max_extra_writers_multi_per_core != new_hot.me_adaptive_floor_max_extra_writers_multi_per_core + || old_hot.me_adaptive_floor_max_active_writers_per_core + != new_hot.me_adaptive_floor_max_active_writers_per_core + || old_hot.me_adaptive_floor_max_warm_writers_per_core + != new_hot.me_adaptive_floor_max_warm_writers_per_core + || old_hot.me_adaptive_floor_max_active_writers_global + != new_hot.me_adaptive_floor_max_active_writers_global + || old_hot.me_adaptive_floor_max_warm_writers_global + != new_hot.me_adaptive_floor_max_warm_writers_global { info!( - "config reload: me_floor: mode={:?} idle={}s min_single={} min_multi={} recover_grace={}s per_core_total={} cores_override={} extra_single_per_core={} extra_multi_per_core={}", + "config reload: me_floor: mode={:?} idle={}s min_single={} min_multi={} recover_grace={}s per_core_total={} cores_override={} extra_single_per_core={} extra_multi_per_core={} max_active_per_core={} max_warm_per_core={} max_active_global={} max_warm_global={}", new_hot.me_floor_mode, new_hot.me_adaptive_floor_idle_secs, new_hot.me_adaptive_floor_min_writers_single_endpoint, @@ -751,6 +783,10 @@ fn log_changes( new_hot.me_adaptive_floor_cpu_cores_override, new_hot.me_adaptive_floor_max_extra_writers_single_per_core, new_hot.me_adaptive_floor_max_extra_writers_multi_per_core, + new_hot.me_adaptive_floor_max_active_writers_per_core, + new_hot.me_adaptive_floor_max_warm_writers_per_core, + new_hot.me_adaptive_floor_max_active_writers_global, + new_hot.me_adaptive_floor_max_warm_writers_global, ); } diff --git a/src/config/load.rs b/src/config/load.rs index e6dc728..25a9994 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -327,6 +327,30 @@ impl ProxyConfig { )); } + if config.general.me_adaptive_floor_max_active_writers_per_core == 0 { + return Err(ProxyError::Config( + "general.me_adaptive_floor_max_active_writers_per_core must be > 0".to_string(), + )); + } + + if config.general.me_adaptive_floor_max_warm_writers_per_core == 0 { + return Err(ProxyError::Config( + "general.me_adaptive_floor_max_warm_writers_per_core must be > 0".to_string(), + )); + } + + if config.general.me_adaptive_floor_max_active_writers_global == 0 { + return Err(ProxyError::Config( + "general.me_adaptive_floor_max_active_writers_global must be > 0".to_string(), + )); + } + + if config.general.me_adaptive_floor_max_warm_writers_global == 0 { + return Err(ProxyError::Config( + "general.me_adaptive_floor_max_warm_writers_global must be > 0".to_string(), + )); + } + if config.general.me_single_endpoint_outage_backoff_min_ms == 0 { return Err(ProxyError::Config( "general.me_single_endpoint_outage_backoff_min_ms must be > 0".to_string(), @@ -1238,6 +1262,46 @@ mod tests { let _ = std::fs::remove_file(path); } + #[test] + fn me_adaptive_floor_max_active_writers_per_core_zero_is_rejected() { + let toml = r#" + [general] + me_adaptive_floor_max_active_writers_per_core = 0 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_me_adaptive_floor_max_active_per_core_zero_test.toml"); + std::fs::write(&path, toml).unwrap(); + let err = ProxyConfig::load(&path).unwrap_err().to_string(); + assert!(err.contains("general.me_adaptive_floor_max_active_writers_per_core must be > 0")); + let _ = std::fs::remove_file(path); + } + + #[test] + fn me_adaptive_floor_max_warm_writers_global_zero_is_rejected() { + let toml = r#" + [general] + me_adaptive_floor_max_warm_writers_global = 0 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_me_adaptive_floor_max_warm_global_zero_test.toml"); + std::fs::write(&path, toml).unwrap(); + let err = ProxyConfig::load(&path).unwrap_err().to_string(); + assert!(err.contains("general.me_adaptive_floor_max_warm_writers_global must be > 0")); + let _ = std::fs::remove_file(path); + } + #[test] fn upstream_connect_retry_attempts_zero_is_rejected() { let toml = r#" diff --git a/src/config/types.rs b/src/config/types.rs index 5a0dbb2..0c89df8 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -545,6 +545,22 @@ pub struct GeneralConfig { #[serde(default = "default_me_adaptive_floor_max_extra_writers_multi_per_core")] pub me_adaptive_floor_max_extra_writers_multi_per_core: u16, + /// Hard cap for active ME writers per logical CPU core. + #[serde(default = "default_me_adaptive_floor_max_active_writers_per_core")] + pub me_adaptive_floor_max_active_writers_per_core: u16, + + /// Hard cap for warm ME writers per logical CPU core. + #[serde(default = "default_me_adaptive_floor_max_warm_writers_per_core")] + pub me_adaptive_floor_max_warm_writers_per_core: u16, + + /// Hard global cap for active ME writers. + #[serde(default = "default_me_adaptive_floor_max_active_writers_global")] + pub me_adaptive_floor_max_active_writers_global: u32, + + /// Hard global cap for warm ME writers. + #[serde(default = "default_me_adaptive_floor_max_warm_writers_global")] + pub me_adaptive_floor_max_warm_writers_global: u32, + /// Connect attempts for the selected upstream before returning error/fallback. #[serde(default = "default_upstream_connect_retry_attempts")] pub upstream_connect_retry_attempts: u32, @@ -802,6 +818,10 @@ impl Default for GeneralConfig { me_adaptive_floor_cpu_cores_override: default_me_adaptive_floor_cpu_cores_override(), me_adaptive_floor_max_extra_writers_single_per_core: default_me_adaptive_floor_max_extra_writers_single_per_core(), me_adaptive_floor_max_extra_writers_multi_per_core: default_me_adaptive_floor_max_extra_writers_multi_per_core(), + me_adaptive_floor_max_active_writers_per_core: default_me_adaptive_floor_max_active_writers_per_core(), + me_adaptive_floor_max_warm_writers_per_core: default_me_adaptive_floor_max_warm_writers_per_core(), + me_adaptive_floor_max_active_writers_global: default_me_adaptive_floor_max_active_writers_global(), + me_adaptive_floor_max_warm_writers_global: default_me_adaptive_floor_max_warm_writers_global(), upstream_connect_retry_attempts: default_upstream_connect_retry_attempts(), upstream_connect_retry_backoff_ms: default_upstream_connect_retry_backoff_ms(), upstream_connect_budget_ms: default_upstream_connect_budget_ms(), diff --git a/src/main.rs b/src/main.rs index ca24d0e..58c70cd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,7 +8,7 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use rand::Rng; use tokio::net::TcpListener; use tokio::signal; -use tokio::sync::{Semaphore, mpsc, watch}; +use tokio::sync::{RwLock, Semaphore, mpsc, watch}; use tracing::{debug, error, info, warn}; use tracing_subscriber::{EnvFilter, fmt, prelude::*, reload}; #[cfg(unix)] @@ -26,6 +26,7 @@ mod protocol; mod proxy; mod stats; mod stream; +mod startup; mod transport; mod tls_front; mod util; @@ -39,6 +40,14 @@ use crate::proxy::ClientHandler; use crate::stats::beobachten::BeobachtenStore; use crate::stats::telemetry::TelemetryPolicy; use crate::stats::{ReplayChecker, Stats}; +use crate::startup::{ + COMPONENT_API_BOOTSTRAP, COMPONENT_CONFIG_LOAD, COMPONENT_CONFIG_WATCHER_START, + COMPONENT_DC_CONNECTIVITY_PING, COMPONENT_LISTENERS_BIND, COMPONENT_ME_CONNECTIVITY_PING, + COMPONENT_ME_POOL_CONSTRUCT, COMPONENT_ME_POOL_INIT_STAGE1, COMPONENT_ME_PROXY_CONFIG_V4, + COMPONENT_ME_PROXY_CONFIG_V6, COMPONENT_ME_SECRET_FETCH, COMPONENT_METRICS_START, + COMPONENT_NETWORK_PROBE, COMPONENT_RUNTIME_READY, COMPONENT_TLS_FRONT_BOOTSTRAP, + COMPONENT_TRACING_INIT, StartupMeStatus, StartupTracker, +}; use crate::stream::BufferPool; use crate::transport::middle_proxy::{ MePool, ProxyConfigData, fetch_proxy_config_with_raw, format_me_route, format_sample_line, @@ -373,6 +382,10 @@ async fn main() -> std::result::Result<(), Box> { .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_secs(); + let startup_tracker = Arc::new(StartupTracker::new(process_started_at_epoch_secs)); + startup_tracker + .start_component(COMPONENT_CONFIG_LOAD, Some("load and validate config".to_string())) + .await; let (config_path, cli_silent, cli_log_level) = parse_cli(); let mut config = match ProxyConfig::load(&config_path) { @@ -399,6 +412,9 @@ async fn main() -> std::result::Result<(), Box> { eprintln!("[telemt] Invalid network.dns_overrides: {}", e); std::process::exit(1); } + startup_tracker + .complete_component(COMPONENT_CONFIG_LOAD, Some("config is ready".to_string())) + .await; let has_rust_log = std::env::var("RUST_LOG").is_ok(); let effective_log_level = if cli_silent { @@ -410,6 +426,9 @@ async fn main() -> std::result::Result<(), Box> { }; let (filter_layer, filter_handle) = reload::Layer::new(EnvFilter::new("info")); + startup_tracker + .start_component(COMPONENT_TRACING_INIT, Some("initialize tracing subscriber".to_string())) + .await; // Configure color output based on config let fmt_layer = if config.general.disable_colors { @@ -422,6 +441,9 @@ async fn main() -> std::result::Result<(), Box> { .with(filter_layer) .with(fmt_layer) .init(); + startup_tracker + .complete_component(COMPONENT_TRACING_INIT, Some("tracing initialized".to_string())) + .await; info!("Telemt MTProxy v{}", env!("CARGO_PKG_VERSION")); info!("Log level: {}", effective_log_level); @@ -473,6 +495,95 @@ async fn main() -> std::result::Result<(), Box> { config.general.upstream_connect_failfast_hard_errors, stats.clone(), )); + let ip_tracker = Arc::new(UserIpTracker::new()); + ip_tracker.load_limits(&config.access.user_max_unique_ips).await; + ip_tracker + .set_limit_policy( + config.access.user_max_unique_ips_mode, + config.access.user_max_unique_ips_window_secs, + ) + .await; + if !config.access.user_max_unique_ips.is_empty() { + info!( + "IP limits configured for {} users", + config.access.user_max_unique_ips.len() + ); + } + if !config.network.dns_overrides.is_empty() { + info!( + "Runtime DNS overrides configured: {} entries", + config.network.dns_overrides.len() + ); + } + + let (api_config_tx, api_config_rx) = watch::channel(Arc::new(config.clone())); + let initial_admission_open = !config.general.use_middle_proxy; + let (admission_tx, admission_rx) = watch::channel(initial_admission_open); + let api_me_pool = Arc::new(RwLock::new(None::>)); + startup_tracker + .start_component(COMPONENT_API_BOOTSTRAP, Some("spawn API listener task".to_string())) + .await; + + if config.server.api.enabled { + let listen = match config.server.api.listen.parse::() { + Ok(listen) => listen, + Err(error) => { + warn!( + error = %error, + listen = %config.server.api.listen, + "Invalid server.api.listen; API is disabled" + ); + SocketAddr::from(([127, 0, 0, 1], 0)) + } + }; + if listen.port() != 0 { + let stats_api = stats.clone(); + let ip_tracker_api = ip_tracker.clone(); + let me_pool_api = api_me_pool.clone(); + let upstream_manager_api = upstream_manager.clone(); + let config_rx_api = api_config_rx.clone(); + let admission_rx_api = admission_rx.clone(); + let config_path_api = std::path::PathBuf::from(&config_path); + let startup_tracker_api = startup_tracker.clone(); + tokio::spawn(async move { + api::serve( + listen, + stats_api, + ip_tracker_api, + me_pool_api, + upstream_manager_api, + config_rx_api, + admission_rx_api, + config_path_api, + None, + None, + process_started_at_epoch_secs, + startup_tracker_api, + ) + .await; + }); + startup_tracker + .complete_component( + COMPONENT_API_BOOTSTRAP, + Some(format!("api task spawned on {}", listen)), + ) + .await; + } else { + startup_tracker + .skip_component( + COMPONENT_API_BOOTSTRAP, + Some("server.api.listen has zero port".to_string()), + ) + .await; + } + } else { + startup_tracker + .skip_component( + COMPONENT_API_BOOTSTRAP, + Some("server.api.enabled is false".to_string()), + ) + .await; + } let mut tls_domains = Vec::with_capacity(1 + config.censorship.tls_domains.len()); tls_domains.push(config.censorship.tls_domain.clone()); @@ -483,6 +594,12 @@ async fn main() -> std::result::Result<(), Box> { } // Start TLS front fetching in background immediately, in parallel with STUN probing. + startup_tracker + .start_component( + COMPONENT_TLS_FRONT_BOOTSTRAP, + Some("initialize TLS front cache/bootstrap tasks".to_string()), + ) + .await; let tls_cache: Option> = if config.censorship.tls_emulation { let cache = Arc::new(TlsFrontCache::new( &tls_domains, @@ -603,9 +720,26 @@ async fn main() -> std::result::Result<(), Box> { Some(cache) } else { + startup_tracker + .skip_component( + COMPONENT_TLS_FRONT_BOOTSTRAP, + Some("censorship.tls_emulation is false".to_string()), + ) + .await; None }; + if tls_cache.is_some() { + startup_tracker + .complete_component( + COMPONENT_TLS_FRONT_BOOTSTRAP, + Some("tls front cache is initialized".to_string()), + ) + .await; + } + startup_tracker + .start_component(COMPONENT_NETWORK_PROBE, Some("probe network capabilities".to_string())) + .await; let probe = run_probe( &config.network, config.general.middle_proxy_nat_probe, @@ -614,32 +748,18 @@ async fn main() -> std::result::Result<(), Box> { .await?; let decision = decide_network_capabilities(&config.network, &probe); log_probe_result(&probe, &decision); + startup_tracker + .complete_component( + COMPONENT_NETWORK_PROBE, + Some("network capabilities determined".to_string()), + ) + .await; let prefer_ipv6 = decision.prefer_ipv6(); let mut use_middle_proxy = config.general.use_middle_proxy; let beobachten = Arc::new(BeobachtenStore::new()); let rng = Arc::new(SecureRandom::new()); - // IP Tracker initialization - let ip_tracker = Arc::new(UserIpTracker::new()); - ip_tracker.load_limits(&config.access.user_max_unique_ips).await; - ip_tracker - .set_limit_policy( - config.access.user_max_unique_ips_mode, - config.access.user_max_unique_ips_window_secs, - ) - .await; - - if !config.access.user_max_unique_ips.is_empty() { - info!("IP limits configured for {} users", config.access.user_max_unique_ips.len()); - } - if !config.network.dns_overrides.is_empty() { - info!( - "Runtime DNS overrides configured: {} entries", - config.network.dns_overrides.len() - ); - } - // Connection concurrency limit let max_connections = Arc::new(Semaphore::new(10_000)); @@ -657,6 +777,59 @@ async fn main() -> std::result::Result<(), Box> { } } + if use_middle_proxy { + startup_tracker + .set_me_status(StartupMeStatus::Initializing, COMPONENT_ME_SECRET_FETCH) + .await; + startup_tracker + .start_component( + COMPONENT_ME_SECRET_FETCH, + Some("fetch proxy-secret from source/cache".to_string()), + ) + .await; + startup_tracker + .set_me_retry_limit(if !me2dc_fallback || me_init_retry_attempts == 0 { + "unlimited".to_string() + } else { + me_init_retry_attempts.to_string() + }) + .await; + } else { + startup_tracker + .set_me_status(StartupMeStatus::Skipped, "skipped") + .await; + startup_tracker + .skip_component( + COMPONENT_ME_SECRET_FETCH, + Some("middle proxy mode disabled".to_string()), + ) + .await; + startup_tracker + .skip_component( + COMPONENT_ME_PROXY_CONFIG_V4, + Some("middle proxy mode disabled".to_string()), + ) + .await; + startup_tracker + .skip_component( + COMPONENT_ME_PROXY_CONFIG_V6, + Some("middle proxy mode disabled".to_string()), + ) + .await; + startup_tracker + .skip_component( + COMPONENT_ME_POOL_CONSTRUCT, + Some("middle proxy mode disabled".to_string()), + ) + .await; + startup_tracker + .skip_component( + COMPONENT_ME_POOL_INIT_STAGE1, + Some("middle proxy mode disabled".to_string()), + ) + .await; + } + // ===================================================================== // Middle Proxy initialization (if enabled) // ===================================================================== @@ -694,6 +867,9 @@ async fn main() -> std::result::Result<(), Box> { { Ok(proxy_secret) => break Some(proxy_secret), Err(e) => { + startup_tracker + .set_me_last_error(Some(e.to_string())) + .await; if me2dc_fallback { error!( error = %e, @@ -713,6 +889,12 @@ async fn main() -> std::result::Result<(), Box> { }; match proxy_secret { Some(proxy_secret) => { + startup_tracker + .complete_component( + COMPONENT_ME_SECRET_FETCH, + Some("proxy-secret loaded".to_string()), + ) + .await; info!( secret_len = proxy_secret.len(), key_sig = format_args!( @@ -731,6 +913,15 @@ async fn main() -> std::result::Result<(), Box> { "Proxy-secret loaded" ); + startup_tracker + .start_component( + COMPONENT_ME_PROXY_CONFIG_V4, + Some("load startup proxy-config v4".to_string()), + ) + .await; + startup_tracker + .set_me_status(StartupMeStatus::Initializing, COMPONENT_ME_PROXY_CONFIG_V4) + .await; let cfg_v4 = load_startup_proxy_config_snapshot( "https://core.telegram.org/getProxyConfig", config.general.proxy_config_v4_cache_path.as_deref(), @@ -738,6 +929,30 @@ async fn main() -> std::result::Result<(), Box> { "getProxyConfig", ) .await; + if cfg_v4.is_some() { + startup_tracker + .complete_component( + COMPONENT_ME_PROXY_CONFIG_V4, + Some("proxy-config v4 loaded".to_string()), + ) + .await; + } else { + startup_tracker + .fail_component( + COMPONENT_ME_PROXY_CONFIG_V4, + Some("proxy-config v4 unavailable".to_string()), + ) + .await; + } + startup_tracker + .start_component( + COMPONENT_ME_PROXY_CONFIG_V6, + Some("load startup proxy-config v6".to_string()), + ) + .await; + startup_tracker + .set_me_status(StartupMeStatus::Initializing, COMPONENT_ME_PROXY_CONFIG_V6) + .await; let cfg_v6 = load_startup_proxy_config_snapshot( "https://core.telegram.org/getProxyConfigV6", config.general.proxy_config_v6_cache_path.as_deref(), @@ -745,8 +960,32 @@ async fn main() -> std::result::Result<(), Box> { "getProxyConfigV6", ) .await; + if cfg_v6.is_some() { + startup_tracker + .complete_component( + COMPONENT_ME_PROXY_CONFIG_V6, + Some("proxy-config v6 loaded".to_string()), + ) + .await; + } else { + startup_tracker + .fail_component( + COMPONENT_ME_PROXY_CONFIG_V6, + Some("proxy-config v6 unavailable".to_string()), + ) + .await; + } if let (Some(cfg_v4), Some(cfg_v6)) = (cfg_v4, cfg_v6) { + startup_tracker + .start_component( + COMPONENT_ME_POOL_CONSTRUCT, + Some("construct ME pool".to_string()), + ) + .await; + startup_tracker + .set_me_status(StartupMeStatus::Initializing, COMPONENT_ME_POOL_CONSTRUCT) + .await; let pool = MePool::new( proxy_tag.clone(), proxy_secret, @@ -792,6 +1031,10 @@ async fn main() -> std::result::Result<(), Box> { config.general.me_adaptive_floor_cpu_cores_override, config.general.me_adaptive_floor_max_extra_writers_single_per_core, config.general.me_adaptive_floor_max_extra_writers_multi_per_core, + config.general.me_adaptive_floor_max_active_writers_per_core, + config.general.me_adaptive_floor_max_warm_writers_per_core, + config.general.me_adaptive_floor_max_active_writers_global, + config.general.me_adaptive_floor_max_warm_writers_global, config.general.hardswap, config.general.me_pool_drain_ttl_secs, config.general.effective_me_pool_force_close_secs(), @@ -813,12 +1056,44 @@ async fn main() -> std::result::Result<(), Box> { config.general.me_route_inline_recovery_attempts, config.general.me_route_inline_recovery_wait_ms, ); + startup_tracker + .complete_component( + COMPONENT_ME_POOL_CONSTRUCT, + Some("ME pool object created".to_string()), + ) + .await; + *api_me_pool.write().await = Some(pool.clone()); + startup_tracker + .start_component( + COMPONENT_ME_POOL_INIT_STAGE1, + Some("initialize ME pool writers".to_string()), + ) + .await; + startup_tracker + .set_me_status( + StartupMeStatus::Initializing, + COMPONENT_ME_POOL_INIT_STAGE1, + ) + .await; let mut init_attempt: u32 = 0; loop { init_attempt = init_attempt.saturating_add(1); + startup_tracker.set_me_init_attempt(init_attempt).await; match pool.init(pool_size, &rng).await { Ok(()) => { + startup_tracker + .set_me_last_error(None) + .await; + startup_tracker + .complete_component( + COMPONENT_ME_POOL_INIT_STAGE1, + Some("ME pool initialized".to_string()), + ) + .await; + startup_tracker + .set_me_status(StartupMeStatus::Ready, "ready") + .await; info!( attempt = init_attempt, "Middle-End pool initialized successfully" @@ -838,8 +1113,20 @@ async fn main() -> std::result::Result<(), Box> { break Some(pool); } Err(e) => { + startup_tracker + .set_me_last_error(Some(e.to_string())) + .await; let retries_limited = me2dc_fallback && me_init_retry_attempts > 0; if retries_limited && init_attempt >= me_init_retry_attempts { + startup_tracker + .fail_component( + COMPONENT_ME_POOL_INIT_STAGE1, + Some("ME init retry budget exhausted".to_string()), + ) + .await; + startup_tracker + .set_me_status(StartupMeStatus::Failed, "failed") + .await; error!( error = %e, attempt = init_attempt, @@ -879,10 +1166,60 @@ async fn main() -> std::result::Result<(), Box> { } } } else { + startup_tracker + .skip_component( + COMPONENT_ME_POOL_CONSTRUCT, + Some("ME configs are incomplete".to_string()), + ) + .await; + startup_tracker + .fail_component( + COMPONENT_ME_POOL_INIT_STAGE1, + Some("ME configs are incomplete".to_string()), + ) + .await; + startup_tracker + .set_me_status(StartupMeStatus::Failed, "failed") + .await; None } } - None => None, + None => { + startup_tracker + .fail_component( + COMPONENT_ME_SECRET_FETCH, + Some("proxy-secret unavailable".to_string()), + ) + .await; + startup_tracker + .skip_component( + COMPONENT_ME_PROXY_CONFIG_V4, + Some("proxy-secret unavailable".to_string()), + ) + .await; + startup_tracker + .skip_component( + COMPONENT_ME_PROXY_CONFIG_V6, + Some("proxy-secret unavailable".to_string()), + ) + .await; + startup_tracker + .skip_component( + COMPONENT_ME_POOL_CONSTRUCT, + Some("proxy-secret unavailable".to_string()), + ) + .await; + startup_tracker + .fail_component( + COMPONENT_ME_POOL_INIT_STAGE1, + Some("proxy-secret unavailable".to_string()), + ) + .await; + startup_tracker + .set_me_status(StartupMeStatus::Failed, "failed") + .await; + None + } } } else { None @@ -890,12 +1227,33 @@ async fn main() -> std::result::Result<(), Box> { // If ME failed to initialize, force direct-only mode. if me_pool.is_some() { + startup_tracker + .set_transport_mode("middle_proxy") + .await; + startup_tracker + .set_degraded(false) + .await; info!("Transport: Middle-End Proxy - all DC-over-RPC"); } else { let _ = use_middle_proxy; use_middle_proxy = false; // Make runtime config reflect direct-only mode for handlers. config.general.use_middle_proxy = false; + startup_tracker + .set_transport_mode("direct") + .await; + startup_tracker + .set_degraded(true) + .await; + if me2dc_fallback { + startup_tracker + .set_me_status(StartupMeStatus::Failed, "fallback_to_direct") + .await; + } else { + startup_tracker + .set_me_status(StartupMeStatus::Skipped, "skipped") + .await; + } info!("Transport: Direct DC - TCP - standard DC-over-TCP"); } @@ -910,6 +1268,21 @@ async fn main() -> std::result::Result<(), Box> { let buffer_pool = Arc::new(BufferPool::with_config(16 * 1024, 4096)); // Middle-End ping before DC connectivity + if me_pool.is_some() { + startup_tracker + .start_component( + COMPONENT_ME_CONNECTIVITY_PING, + Some("run startup ME connectivity check".to_string()), + ) + .await; + } else { + startup_tracker + .skip_component( + COMPONENT_ME_CONNECTIVITY_PING, + Some("ME pool is not available".to_string()), + ) + .await; + } if let Some(ref pool) = me_pool { let me_results = run_me_ping(pool, &rng).await; @@ -979,9 +1352,21 @@ async fn main() -> std::result::Result<(), Box> { } } info!("============================================================"); + startup_tracker + .complete_component( + COMPONENT_ME_CONNECTIVITY_PING, + Some("startup ME connectivity check completed".to_string()), + ) + .await; } info!("================= Telegram DC Connectivity ================="); + startup_tracker + .start_component( + COMPONENT_DC_CONNECTIVITY_PING, + Some("run startup DC connectivity check".to_string()), + ) + .await; let ping_results = upstream_manager .ping_all_dcs( @@ -1061,9 +1446,21 @@ async fn main() -> std::result::Result<(), Box> { info!("============================================================"); } } + startup_tracker + .complete_component( + COMPONENT_DC_CONNECTIVITY_PING, + Some("startup DC connectivity check completed".to_string()), + ) + .await; let initialized_secs = process_started_at.elapsed().as_secs(); let second_suffix = if initialized_secs == 1 { "" } else { "s" }; + startup_tracker + .start_component( + COMPONENT_RUNTIME_READY, + Some("finalize startup runtime state".to_string()), + ) + .await; info!("===================== Telegram Startup ====================="); info!( " DC/ME Initialized in {} second{}", @@ -1074,6 +1471,7 @@ async fn main() -> std::result::Result<(), Box> { if let Some(ref pool) = me_pool { pool.set_runtime_ready(true); } + *api_me_pool.write().await = me_pool.clone(); // Background tasks let um_clone = upstream_manager.clone(); @@ -1105,6 +1503,12 @@ async fn main() -> std::result::Result<(), Box> { // ── Hot-reload watcher ──────────────────────────────────────────────── // Uses inotify to detect file changes instantly (SIGHUP also works). // detected_ip_v4/v6 are passed so newly added users get correct TG links. + startup_tracker + .start_component( + COMPONENT_CONFIG_WATCHER_START, + Some("spawn config hot-reload watcher".to_string()), + ) + .await; let (config_rx, mut log_level_rx): ( tokio::sync::watch::Receiver>, tokio::sync::watch::Receiver, @@ -1114,6 +1518,23 @@ async fn main() -> std::result::Result<(), Box> { detected_ip_v4, detected_ip_v6, ); + startup_tracker + .complete_component( + COMPONENT_CONFIG_WATCHER_START, + Some("config hot-reload watcher started".to_string()), + ) + .await; + let mut config_rx_api_bridge = config_rx.clone(); + let api_config_tx_bridge = api_config_tx.clone(); + tokio::spawn(async move { + loop { + if config_rx_api_bridge.changed().await.is_err() { + break; + } + let cfg = config_rx_api_bridge.borrow_and_update().clone(); + api_config_tx_bridge.send_replace(cfg); + } + }); let stats_policy = stats.clone(); let mut config_rx_policy = config_rx.clone(); @@ -1244,6 +1665,12 @@ async fn main() -> std::result::Result<(), Box> { }); } + startup_tracker + .start_component( + COMPONENT_LISTENERS_BIND, + Some("bind TCP/Unix listeners".to_string()), + ) + .await; let mut listeners = Vec::new(); for listener_conf in &config.server.listeners { @@ -1345,7 +1772,6 @@ async fn main() -> std::result::Result<(), Box> { print_proxy_links(&host, port, &config); } - let (admission_tx, admission_rx) = watch::channel(true); if config.general.use_middle_proxy { if let Some(pool) = me_pool.as_ref() { let initial_open = pool.admission_ready_conditional_cast().await; @@ -1495,6 +1921,16 @@ async fn main() -> std::result::Result<(), Box> { } }); } + startup_tracker + .complete_component( + COMPONENT_LISTENERS_BIND, + Some(format!( + "listeners configured tcp={} unix={}", + listeners.len(), + has_unix_listener + )), + ) + .await; if listeners.is_empty() && !has_unix_listener { error!("No listeners. Exiting."); @@ -1528,6 +1964,12 @@ async fn main() -> std::result::Result<(), Box> { }); if let Some(port) = config.server.metrics_port { + startup_tracker + .start_component( + COMPONENT_METRICS_START, + Some(format!("spawn metrics endpoint on {}", port)), + ) + .await; let stats = stats.clone(); let beobachten = beobachten.clone(); let config_rx_metrics = config_rx.clone(); @@ -1544,48 +1986,28 @@ async fn main() -> std::result::Result<(), Box> { ) .await; }); + startup_tracker + .complete_component( + COMPONENT_METRICS_START, + Some("metrics task spawned".to_string()), + ) + .await; + } else { + startup_tracker + .skip_component( + COMPONENT_METRICS_START, + Some("server.metrics_port is not configured".to_string()), + ) + .await; } - if config.server.api.enabled { - let listen = match config.server.api.listen.parse::() { - Ok(listen) => listen, - Err(error) => { - warn!( - error = %error, - listen = %config.server.api.listen, - "Invalid server.api.listen; API is disabled" - ); - SocketAddr::from(([127, 0, 0, 1], 0)) - } - }; - if listen.port() != 0 { - let stats = stats.clone(); - let ip_tracker_api = ip_tracker.clone(); - let me_pool_api = me_pool.clone(); - let upstream_manager_api = upstream_manager.clone(); - let config_rx_api = config_rx.clone(); - let admission_rx_api = admission_rx.clone(); - let config_path_api = std::path::PathBuf::from(&config_path); - let startup_detected_ip_v4 = detected_ip_v4; - let startup_detected_ip_v6 = detected_ip_v6; - tokio::spawn(async move { - api::serve( - listen, - stats, - ip_tracker_api, - me_pool_api, - upstream_manager_api, - config_rx_api, - admission_rx_api, - config_path_api, - startup_detected_ip_v4, - startup_detected_ip_v6, - process_started_at_epoch_secs, - ) - .await; - }); - } - } + startup_tracker + .complete_component( + COMPONENT_RUNTIME_READY, + Some("startup pipeline is fully initialized".to_string()), + ) + .await; + startup_tracker.mark_ready().await; for (listener, listener_proxy_protocol) in listeners { let mut config_rx: tokio::sync::watch::Receiver> = config_rx.clone(); diff --git a/src/metrics.rs b/src/metrics.rs index 633a884..b338df5 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1053,6 +1053,102 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp 0 } ); + let _ = writeln!( + out, + "# HELP telemt_me_adaptive_floor_active_cap_configured Runtime configured active writer cap" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_adaptive_floor_active_cap_configured gauge" + ); + let _ = writeln!( + out, + "telemt_me_adaptive_floor_active_cap_configured {}", + if me_allows_normal { + stats.get_me_floor_active_cap_configured_gauge() + } else { + 0 + } + ); + let _ = writeln!( + out, + "# HELP telemt_me_adaptive_floor_active_cap_effective Runtime effective active writer cap" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_adaptive_floor_active_cap_effective gauge" + ); + let _ = writeln!( + out, + "telemt_me_adaptive_floor_active_cap_effective {}", + if me_allows_normal { + stats.get_me_floor_active_cap_effective_gauge() + } else { + 0 + } + ); + let _ = writeln!( + out, + "# HELP telemt_me_adaptive_floor_warm_cap_configured Runtime configured warm writer cap" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_adaptive_floor_warm_cap_configured gauge" + ); + let _ = writeln!( + out, + "telemt_me_adaptive_floor_warm_cap_configured {}", + if me_allows_normal { + stats.get_me_floor_warm_cap_configured_gauge() + } else { + 0 + } + ); + let _ = writeln!( + out, + "# HELP telemt_me_adaptive_floor_warm_cap_effective Runtime effective warm writer cap" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_adaptive_floor_warm_cap_effective gauge" + ); + let _ = writeln!( + out, + "telemt_me_adaptive_floor_warm_cap_effective {}", + if me_allows_normal { + stats.get_me_floor_warm_cap_effective_gauge() + } else { + 0 + } + ); + let _ = writeln!( + out, + "# HELP telemt_me_writers_active_current Current non-draining active ME writers" + ); + let _ = writeln!(out, "# TYPE telemt_me_writers_active_current gauge"); + let _ = writeln!( + out, + "telemt_me_writers_active_current {}", + if me_allows_normal { + stats.get_me_writers_active_current_gauge() + } else { + 0 + } + ); + let _ = writeln!( + out, + "# HELP telemt_me_writers_warm_current Current non-draining warm ME writers" + ); + let _ = writeln!(out, "# TYPE telemt_me_writers_warm_current gauge"); + let _ = writeln!( + out, + "telemt_me_writers_warm_current {}", + if me_allows_normal { + stats.get_me_writers_warm_current_gauge() + } else { + 0 + } + ); let _ = writeln!( out, "# HELP telemt_me_floor_cap_block_total Reconnect attempts blocked by adaptive floor caps" diff --git a/src/startup.rs b/src/startup.rs new file mode 100644 index 0000000..f6f857c --- /dev/null +++ b/src/startup.rs @@ -0,0 +1,373 @@ +use std::time::{Instant, SystemTime, UNIX_EPOCH}; + +use tokio::sync::RwLock; + +pub const COMPONENT_CONFIG_LOAD: &str = "config_load"; +pub const COMPONENT_TRACING_INIT: &str = "tracing_init"; +pub const COMPONENT_API_BOOTSTRAP: &str = "api_bootstrap"; +pub const COMPONENT_TLS_FRONT_BOOTSTRAP: &str = "tls_front_bootstrap"; +pub const COMPONENT_NETWORK_PROBE: &str = "network_probe"; +pub const COMPONENT_ME_SECRET_FETCH: &str = "me_secret_fetch"; +pub const COMPONENT_ME_PROXY_CONFIG_V4: &str = "me_proxy_config_fetch_v4"; +pub const COMPONENT_ME_PROXY_CONFIG_V6: &str = "me_proxy_config_fetch_v6"; +pub const COMPONENT_ME_POOL_CONSTRUCT: &str = "me_pool_construct"; +pub const COMPONENT_ME_POOL_INIT_STAGE1: &str = "me_pool_init_stage1"; +pub const COMPONENT_ME_CONNECTIVITY_PING: &str = "me_connectivity_ping"; +pub const COMPONENT_DC_CONNECTIVITY_PING: &str = "dc_connectivity_ping"; +pub const COMPONENT_LISTENERS_BIND: &str = "listeners_bind"; +pub const COMPONENT_CONFIG_WATCHER_START: &str = "config_watcher_start"; +pub const COMPONENT_METRICS_START: &str = "metrics_start"; +pub const COMPONENT_RUNTIME_READY: &str = "runtime_ready"; + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum StartupStatus { + Initializing, + Ready, +} + +impl StartupStatus { + pub fn as_str(self) -> &'static str { + match self { + Self::Initializing => "initializing", + Self::Ready => "ready", + } + } +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum StartupComponentStatus { + Pending, + Running, + Ready, + Failed, + Skipped, +} + +impl StartupComponentStatus { + pub fn as_str(self) -> &'static str { + match self { + Self::Pending => "pending", + Self::Running => "running", + Self::Ready => "ready", + Self::Failed => "failed", + Self::Skipped => "skipped", + } + } +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum StartupMeStatus { + Pending, + Initializing, + Ready, + Failed, + Skipped, +} + +impl StartupMeStatus { + pub fn as_str(self) -> &'static str { + match self { + Self::Pending => "pending", + Self::Initializing => "initializing", + Self::Ready => "ready", + Self::Failed => "failed", + Self::Skipped => "skipped", + } + } +} + +#[derive(Clone, Debug)] +pub struct StartupComponentSnapshot { + pub id: &'static str, + pub title: &'static str, + pub weight: f64, + pub status: StartupComponentStatus, + pub started_at_epoch_ms: Option, + pub finished_at_epoch_ms: Option, + pub duration_ms: Option, + pub attempts: u32, + pub details: Option, +} + +#[derive(Clone, Debug)] +pub struct StartupMeSnapshot { + pub status: StartupMeStatus, + pub current_stage: String, + pub init_attempt: u32, + pub retry_limit: String, + pub last_error: Option, +} + +#[derive(Clone, Debug)] +pub struct StartupSnapshot { + pub status: StartupStatus, + pub degraded: bool, + pub current_stage: String, + pub started_at_epoch_secs: u64, + pub ready_at_epoch_secs: Option, + pub total_elapsed_ms: u64, + pub transport_mode: String, + pub me: StartupMeSnapshot, + pub components: Vec, +} + +#[derive(Clone, Debug)] +struct StartupComponent { + id: &'static str, + title: &'static str, + weight: f64, + status: StartupComponentStatus, + started_at_epoch_ms: Option, + finished_at_epoch_ms: Option, + duration_ms: Option, + attempts: u32, + details: Option, +} + +#[derive(Clone, Debug)] +struct StartupState { + status: StartupStatus, + degraded: bool, + current_stage: String, + started_at_epoch_secs: u64, + ready_at_epoch_secs: Option, + transport_mode: String, + me: StartupMeSnapshot, + components: Vec, +} + +pub struct StartupTracker { + started_at_instant: Instant, + state: RwLock, +} + +impl StartupTracker { + pub fn new(started_at_epoch_secs: u64) -> Self { + Self { + started_at_instant: Instant::now(), + state: RwLock::new(StartupState { + status: StartupStatus::Initializing, + degraded: false, + current_stage: COMPONENT_CONFIG_LOAD.to_string(), + started_at_epoch_secs, + ready_at_epoch_secs: None, + transport_mode: "unknown".to_string(), + me: StartupMeSnapshot { + status: StartupMeStatus::Pending, + current_stage: "pending".to_string(), + init_attempt: 0, + retry_limit: "unlimited".to_string(), + last_error: None, + }, + components: component_blueprint(), + }), + } + } + + pub async fn set_transport_mode(&self, mode: &'static str) { + self.state.write().await.transport_mode = mode.to_string(); + } + + pub async fn set_degraded(&self, degraded: bool) { + self.state.write().await.degraded = degraded; + } + + pub async fn start_component(&self, id: &'static str, details: Option) { + let mut guard = self.state.write().await; + guard.current_stage = id.to_string(); + if let Some(component) = guard.components.iter_mut().find(|component| component.id == id) { + if component.started_at_epoch_ms.is_none() { + component.started_at_epoch_ms = Some(now_epoch_ms()); + } + component.attempts = component.attempts.saturating_add(1); + component.status = StartupComponentStatus::Running; + component.details = normalize_details(details); + } + } + + pub async fn complete_component(&self, id: &'static str, details: Option) { + self.finish_component(id, StartupComponentStatus::Ready, details) + .await; + } + + pub async fn fail_component(&self, id: &'static str, details: Option) { + self.finish_component(id, StartupComponentStatus::Failed, details) + .await; + } + + pub async fn skip_component(&self, id: &'static str, details: Option) { + self.finish_component(id, StartupComponentStatus::Skipped, details) + .await; + } + + async fn finish_component( + &self, + id: &'static str, + status: StartupComponentStatus, + details: Option, + ) { + let mut guard = self.state.write().await; + let finished_at = now_epoch_ms(); + if let Some(component) = guard.components.iter_mut().find(|component| component.id == id) { + if component.started_at_epoch_ms.is_none() { + component.started_at_epoch_ms = Some(finished_at); + component.attempts = component.attempts.saturating_add(1); + } + component.finished_at_epoch_ms = Some(finished_at); + component.duration_ms = component + .started_at_epoch_ms + .map(|started_at| finished_at.saturating_sub(started_at)); + component.status = status; + component.details = normalize_details(details); + } + } + + pub async fn set_me_status(&self, status: StartupMeStatus, stage: &'static str) { + let mut guard = self.state.write().await; + guard.me.status = status; + guard.me.current_stage = stage.to_string(); + } + + pub async fn set_me_retry_limit(&self, retry_limit: String) { + self.state.write().await.me.retry_limit = retry_limit; + } + + pub async fn set_me_init_attempt(&self, attempt: u32) { + self.state.write().await.me.init_attempt = attempt; + } + + pub async fn set_me_last_error(&self, error: Option) { + self.state.write().await.me.last_error = normalize_details(error); + } + + pub async fn mark_ready(&self) { + let mut guard = self.state.write().await; + if guard.status == StartupStatus::Ready { + return; + } + guard.status = StartupStatus::Ready; + guard.current_stage = "ready".to_string(); + guard.ready_at_epoch_secs = Some(now_epoch_secs()); + } + + pub async fn snapshot(&self) -> StartupSnapshot { + let guard = self.state.read().await; + StartupSnapshot { + status: guard.status, + degraded: guard.degraded, + current_stage: guard.current_stage.clone(), + started_at_epoch_secs: guard.started_at_epoch_secs, + ready_at_epoch_secs: guard.ready_at_epoch_secs, + total_elapsed_ms: self.started_at_instant.elapsed().as_millis() as u64, + transport_mode: guard.transport_mode.clone(), + me: guard.me.clone(), + components: guard + .components + .iter() + .map(|component| StartupComponentSnapshot { + id: component.id, + title: component.title, + weight: component.weight, + status: component.status, + started_at_epoch_ms: component.started_at_epoch_ms, + finished_at_epoch_ms: component.finished_at_epoch_ms, + duration_ms: component.duration_ms, + attempts: component.attempts, + details: component.details.clone(), + }) + .collect(), + } + } +} + +pub fn compute_progress_pct(snapshot: &StartupSnapshot, me_stage_progress: Option) -> f64 { + if snapshot.status == StartupStatus::Ready { + return 100.0; + } + + let mut total_weight = 0.0f64; + let mut completed_weight = 0.0f64; + + for component in &snapshot.components { + total_weight += component.weight; + let unit_progress = match component.status { + StartupComponentStatus::Pending => 0.0, + StartupComponentStatus::Running => { + if component.id == COMPONENT_ME_POOL_INIT_STAGE1 { + me_stage_progress.unwrap_or(0.0).clamp(0.0, 1.0) + } else { + 0.0 + } + } + StartupComponentStatus::Ready + | StartupComponentStatus::Failed + | StartupComponentStatus::Skipped => 1.0, + }; + completed_weight += component.weight * unit_progress; + } + + if total_weight <= f64::EPSILON { + 0.0 + } else { + ((completed_weight / total_weight) * 100.0).clamp(0.0, 100.0) + } +} + +fn component_blueprint() -> Vec { + vec![ + component(COMPONENT_CONFIG_LOAD, "Config load", 5.0), + component(COMPONENT_TRACING_INIT, "Tracing init", 3.0), + component(COMPONENT_API_BOOTSTRAP, "API bootstrap", 5.0), + component(COMPONENT_TLS_FRONT_BOOTSTRAP, "TLS front bootstrap", 5.0), + component(COMPONENT_NETWORK_PROBE, "Network probe", 10.0), + component(COMPONENT_ME_SECRET_FETCH, "ME secret fetch", 8.0), + component(COMPONENT_ME_PROXY_CONFIG_V4, "ME config v4 fetch", 4.0), + component(COMPONENT_ME_PROXY_CONFIG_V6, "ME config v6 fetch", 4.0), + component(COMPONENT_ME_POOL_CONSTRUCT, "ME pool construct", 6.0), + component(COMPONENT_ME_POOL_INIT_STAGE1, "ME pool init stage1", 24.0), + component(COMPONENT_ME_CONNECTIVITY_PING, "ME connectivity ping", 6.0), + component(COMPONENT_DC_CONNECTIVITY_PING, "DC connectivity ping", 8.0), + component(COMPONENT_LISTENERS_BIND, "Listener bind", 8.0), + component(COMPONENT_CONFIG_WATCHER_START, "Config watcher start", 2.0), + component(COMPONENT_METRICS_START, "Metrics start", 1.0), + component(COMPONENT_RUNTIME_READY, "Runtime ready", 1.0), + ] +} + +fn component(id: &'static str, title: &'static str, weight: f64) -> StartupComponent { + StartupComponent { + id, + title, + weight, + status: StartupComponentStatus::Pending, + started_at_epoch_ms: None, + finished_at_epoch_ms: None, + duration_ms: None, + attempts: 0, + details: None, + } +} + +fn normalize_details(details: Option) -> Option { + details.map(|detail| { + if detail.len() <= 256 { + detail + } else { + detail[..256].to_string() + } + }) +} + +fn now_epoch_secs() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() +} + +fn now_epoch_ms() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64 +} diff --git a/src/stats/mod.rs b/src/stats/mod.rs index fbfc987..10d8882 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -80,6 +80,12 @@ pub struct Stats { me_floor_global_cap_raw_gauge: AtomicU64, me_floor_global_cap_effective_gauge: AtomicU64, me_floor_target_writers_total_gauge: AtomicU64, + me_floor_active_cap_configured_gauge: AtomicU64, + me_floor_active_cap_effective_gauge: AtomicU64, + me_floor_warm_cap_configured_gauge: AtomicU64, + me_floor_warm_cap_effective_gauge: AtomicU64, + me_writers_active_current_gauge: AtomicU64, + me_writers_warm_current_gauge: AtomicU64, me_floor_cap_block_total: AtomicU64, me_floor_swap_idle_total: AtomicU64, me_floor_swap_idle_failed_total: AtomicU64, @@ -764,6 +770,42 @@ impl Stats { .store(value, Ordering::Relaxed); } } + pub fn set_me_floor_active_cap_configured_gauge(&self, value: u64) { + if self.telemetry_me_allows_normal() { + self.me_floor_active_cap_configured_gauge + .store(value, Ordering::Relaxed); + } + } + pub fn set_me_floor_active_cap_effective_gauge(&self, value: u64) { + if self.telemetry_me_allows_normal() { + self.me_floor_active_cap_effective_gauge + .store(value, Ordering::Relaxed); + } + } + pub fn set_me_floor_warm_cap_configured_gauge(&self, value: u64) { + if self.telemetry_me_allows_normal() { + self.me_floor_warm_cap_configured_gauge + .store(value, Ordering::Relaxed); + } + } + pub fn set_me_floor_warm_cap_effective_gauge(&self, value: u64) { + if self.telemetry_me_allows_normal() { + self.me_floor_warm_cap_effective_gauge + .store(value, Ordering::Relaxed); + } + } + pub fn set_me_writers_active_current_gauge(&self, value: u64) { + if self.telemetry_me_allows_normal() { + self.me_writers_active_current_gauge + .store(value, Ordering::Relaxed); + } + } + pub fn set_me_writers_warm_current_gauge(&self, value: u64) { + if self.telemetry_me_allows_normal() { + self.me_writers_warm_current_gauge + .store(value, Ordering::Relaxed); + } + } pub fn increment_me_floor_cap_block_total(&self) { if self.telemetry_me_allows_normal() { self.me_floor_cap_block_total.fetch_add(1, Ordering::Relaxed); @@ -904,6 +946,30 @@ impl Stats { self.me_floor_target_writers_total_gauge .load(Ordering::Relaxed) } + pub fn get_me_floor_active_cap_configured_gauge(&self) -> u64 { + self.me_floor_active_cap_configured_gauge + .load(Ordering::Relaxed) + } + pub fn get_me_floor_active_cap_effective_gauge(&self) -> u64 { + self.me_floor_active_cap_effective_gauge + .load(Ordering::Relaxed) + } + pub fn get_me_floor_warm_cap_configured_gauge(&self) -> u64 { + self.me_floor_warm_cap_configured_gauge + .load(Ordering::Relaxed) + } + pub fn get_me_floor_warm_cap_effective_gauge(&self) -> u64 { + self.me_floor_warm_cap_effective_gauge + .load(Ordering::Relaxed) + } + pub fn get_me_writers_active_current_gauge(&self) -> u64 { + self.me_writers_active_current_gauge + .load(Ordering::Relaxed) + } + pub fn get_me_writers_warm_current_gauge(&self) -> u64 { + self.me_writers_warm_current_gauge + .load(Ordering::Relaxed) + } pub fn get_me_floor_cap_block_total(&self) -> u64 { self.me_floor_cap_block_total.load(Ordering::Relaxed) } diff --git a/src/transport/middle_proxy/config_updater.rs b/src/transport/middle_proxy/config_updater.rs index 4bc3ff7..cccf381 100644 --- a/src/transport/middle_proxy/config_updater.rs +++ b/src/transport/middle_proxy/config_updater.rs @@ -321,6 +321,10 @@ async fn run_update_cycle( cfg.general.me_adaptive_floor_cpu_cores_override, cfg.general.me_adaptive_floor_max_extra_writers_single_per_core, cfg.general.me_adaptive_floor_max_extra_writers_multi_per_core, + cfg.general.me_adaptive_floor_max_active_writers_per_core, + cfg.general.me_adaptive_floor_max_warm_writers_per_core, + cfg.general.me_adaptive_floor_max_active_writers_global, + cfg.general.me_adaptive_floor_max_warm_writers_global, ); let required_cfg_snapshots = cfg.general.me_config_stable_snapshots.max(1); @@ -538,6 +542,10 @@ pub async fn me_config_updater( cfg.general.me_adaptive_floor_cpu_cores_override, cfg.general.me_adaptive_floor_max_extra_writers_single_per_core, cfg.general.me_adaptive_floor_max_extra_writers_multi_per_core, + cfg.general.me_adaptive_floor_max_active_writers_per_core, + cfg.general.me_adaptive_floor_max_warm_writers_per_core, + cfg.general.me_adaptive_floor_max_active_writers_global, + cfg.general.me_adaptive_floor_max_warm_writers_global, ); let new_secs = cfg.general.effective_update_every_secs().max(1); if new_secs == update_every_secs { diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index b0536cc..aefeead 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -42,7 +42,12 @@ struct DcFloorPlanEntry { #[derive(Debug, Clone)] struct FamilyFloorPlan { by_dc: HashMap, - global_cap_effective_total: usize, + active_cap_configured_total: usize, + active_cap_effective_total: usize, + warm_cap_configured_total: usize, + warm_cap_effective_total: usize, + active_writers_current: usize, + warm_writers_current: usize, target_writers_total: usize, } @@ -169,6 +174,14 @@ async fn check_family( for writer in pool.writers.read().await.iter().filter(|w| { !w.draining.load(std::sync::atomic::Ordering::Relaxed) }) { + if !matches!( + super::pool::WriterContour::from_u8( + writer.contour.load(std::sync::atomic::Ordering::Relaxed), + ), + super::pool::WriterContour::Active + ) { + continue; + } let key = (writer.writer_dc, writer.addr); *live_addr_counts.entry(key).or_insert(0) += 1; live_writer_ids_by_addr @@ -194,8 +207,13 @@ async fn check_family( ) .await; pool.set_adaptive_floor_runtime_caps( - floor_plan.global_cap_effective_total, + floor_plan.active_cap_configured_total, + floor_plan.active_cap_effective_total, + floor_plan.warm_cap_configured_total, + floor_plan.warm_cap_effective_total, floor_plan.target_writers_total, + floor_plan.active_writers_current, + floor_plan.warm_writers_current, ); for (dc, endpoints) in dc_endpoints { @@ -344,8 +362,8 @@ async fn check_family( break; } reconnect_budget = reconnect_budget.saturating_sub(1); - if pool.floor_mode() == MeFloorMode::Adaptive - && pool.active_writer_count_total().await >= floor_plan.global_cap_effective_total + if pool.active_contour_writer_count_total().await + >= floor_plan.active_cap_effective_total { let swapped = maybe_swap_idle_writer_for_cap( pool, @@ -370,7 +388,7 @@ async fn check_family( ?family, alive, required, - global_cap_effective_total = floor_plan.global_cap_effective_total, + active_cap_effective_total = floor_plan.active_cap_effective_total, "Adaptive floor cap reached, reconnect attempt blocked" ); break; @@ -518,6 +536,8 @@ async fn build_family_floor_plan( let floor_mode = pool.floor_mode(); let is_adaptive = floor_mode == MeFloorMode::Adaptive; let cpu_cores = pool.adaptive_floor_effective_cpu_cores().max(1); + let (active_writers_current, warm_writers_current, _) = + pool.non_draining_writer_counts_by_contour().await; for (dc, endpoints) in dc_endpoints { if endpoints.is_empty() { @@ -576,9 +596,16 @@ async fn build_family_floor_plan( } if entries.is_empty() { + let active_cap_configured_total = pool.adaptive_floor_active_cap_configured_total(); + let warm_cap_configured_total = pool.adaptive_floor_warm_cap_configured_total(); return FamilyFloorPlan { by_dc, - global_cap_effective_total: 0, + active_cap_configured_total, + active_cap_effective_total: active_cap_configured_total, + warm_cap_configured_total, + warm_cap_effective_total: warm_cap_configured_total, + active_writers_current, + warm_writers_current, target_writers_total: 0, }; } @@ -588,20 +615,26 @@ async fn build_family_floor_plan( .iter() .map(|entry| entry.target_required) .sum::(); - let active_total = pool.active_writer_count_total().await; + let active_cap_configured_total = pool.adaptive_floor_active_cap_configured_total(); + let warm_cap_configured_total = pool.adaptive_floor_warm_cap_configured_total(); for entry in entries { by_dc.insert(entry.dc, entry); } return FamilyFloorPlan { by_dc, - global_cap_effective_total: active_total.max(target_total), + active_cap_configured_total, + active_cap_effective_total: active_cap_configured_total.max(target_total), + warm_cap_configured_total, + warm_cap_effective_total: warm_cap_configured_total, + active_writers_current, + warm_writers_current, target_writers_total: target_total, }; } - let global_cap_raw = pool.adaptive_floor_global_cap_raw(); - let total_active = pool.active_writer_count_total().await; - let other_active = total_active.saturating_sub(family_active_total); + let active_cap_configured_total = pool.adaptive_floor_active_cap_configured_total(); + let warm_cap_configured_total = pool.adaptive_floor_warm_cap_configured_total(); + let other_active = active_writers_current.saturating_sub(family_active_total); let min_sum = entries .iter() .map(|entry| entry.min_required) @@ -610,7 +643,7 @@ async fn build_family_floor_plan( .iter() .map(|entry| entry.target_required) .sum::(); - let family_cap = global_cap_raw + let family_cap = active_cap_configured_total .saturating_sub(other_active) .max(min_sum); if target_sum > family_cap { @@ -645,11 +678,17 @@ async fn build_family_floor_plan( for entry in entries { by_dc.insert(entry.dc, entry); } - let global_cap_effective_total = global_cap_raw.max(other_active.saturating_add(min_sum)); + let active_cap_effective_total = + active_cap_configured_total.max(other_active.saturating_add(min_sum)); let target_writers_total = other_active.saturating_add(target_sum); FamilyFloorPlan { by_dc, - global_cap_effective_total, + active_cap_configured_total, + active_cap_effective_total, + warm_cap_configured_total, + warm_cap_effective_total: warm_cap_configured_total, + active_writers_current, + warm_writers_current, target_writers_total, } } diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 13259bb..5ec512a 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -125,11 +125,21 @@ pub struct MePool { pub(super) me_adaptive_floor_cpu_cores_override: AtomicU32, pub(super) me_adaptive_floor_max_extra_writers_single_per_core: AtomicU32, pub(super) me_adaptive_floor_max_extra_writers_multi_per_core: AtomicU32, + pub(super) me_adaptive_floor_max_active_writers_per_core: AtomicU32, + pub(super) me_adaptive_floor_max_warm_writers_per_core: AtomicU32, + pub(super) me_adaptive_floor_max_active_writers_global: AtomicU32, + pub(super) me_adaptive_floor_max_warm_writers_global: AtomicU32, pub(super) me_adaptive_floor_cpu_cores_detected: AtomicU32, pub(super) me_adaptive_floor_cpu_cores_effective: AtomicU32, pub(super) me_adaptive_floor_global_cap_raw: AtomicU64, pub(super) me_adaptive_floor_global_cap_effective: AtomicU64, pub(super) me_adaptive_floor_target_writers_total: AtomicU64, + pub(super) me_adaptive_floor_active_cap_configured: AtomicU64, + pub(super) me_adaptive_floor_active_cap_effective: AtomicU64, + pub(super) me_adaptive_floor_warm_cap_configured: AtomicU64, + pub(super) me_adaptive_floor_warm_cap_effective: AtomicU64, + pub(super) me_adaptive_floor_active_writers_current: AtomicU64, + pub(super) me_adaptive_floor_warm_writers_current: AtomicU64, pub(super) proxy_map_v4: Arc>>>, pub(super) proxy_map_v6: Arc>>>, pub(super) endpoint_dc_map: Arc>>>, @@ -243,6 +253,10 @@ impl MePool { me_adaptive_floor_cpu_cores_override: u16, me_adaptive_floor_max_extra_writers_single_per_core: u16, me_adaptive_floor_max_extra_writers_multi_per_core: u16, + me_adaptive_floor_max_active_writers_per_core: u16, + me_adaptive_floor_max_warm_writers_per_core: u16, + me_adaptive_floor_max_active_writers_global: u32, + me_adaptive_floor_max_warm_writers_global: u32, hardswap: bool, me_pool_drain_ttl_secs: u64, me_pool_force_close_secs: u64, @@ -358,11 +372,29 @@ impl MePool { me_adaptive_floor_max_extra_writers_multi_per_core: AtomicU32::new( me_adaptive_floor_max_extra_writers_multi_per_core as u32, ), + me_adaptive_floor_max_active_writers_per_core: AtomicU32::new( + me_adaptive_floor_max_active_writers_per_core as u32, + ), + me_adaptive_floor_max_warm_writers_per_core: AtomicU32::new( + me_adaptive_floor_max_warm_writers_per_core as u32, + ), + me_adaptive_floor_max_active_writers_global: AtomicU32::new( + me_adaptive_floor_max_active_writers_global, + ), + me_adaptive_floor_max_warm_writers_global: AtomicU32::new( + me_adaptive_floor_max_warm_writers_global, + ), me_adaptive_floor_cpu_cores_detected: AtomicU32::new(1), me_adaptive_floor_cpu_cores_effective: AtomicU32::new(1), me_adaptive_floor_global_cap_raw: AtomicU64::new(0), me_adaptive_floor_global_cap_effective: AtomicU64::new(0), me_adaptive_floor_target_writers_total: AtomicU64::new(0), + me_adaptive_floor_active_cap_configured: AtomicU64::new(0), + me_adaptive_floor_active_cap_effective: AtomicU64::new(0), + me_adaptive_floor_warm_cap_configured: AtomicU64::new(0), + me_adaptive_floor_warm_cap_effective: AtomicU64::new(0), + me_adaptive_floor_active_writers_current: AtomicU64::new(0), + me_adaptive_floor_warm_writers_current: AtomicU64::new(0), pool_size: 2, proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)), proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)), @@ -453,6 +485,10 @@ impl MePool { adaptive_floor_cpu_cores_override: u16, adaptive_floor_max_extra_writers_single_per_core: u16, adaptive_floor_max_extra_writers_multi_per_core: u16, + adaptive_floor_max_active_writers_per_core: u16, + adaptive_floor_max_warm_writers_per_core: u16, + adaptive_floor_max_active_writers_global: u32, + adaptive_floor_max_warm_writers_global: u32, ) { self.hardswap.store(hardswap, Ordering::Relaxed); self.me_pool_drain_ttl_secs @@ -514,6 +550,20 @@ impl MePool { adaptive_floor_max_extra_writers_multi_per_core as u32, Ordering::Relaxed, ); + self.me_adaptive_floor_max_active_writers_per_core + .store( + adaptive_floor_max_active_writers_per_core as u32, + Ordering::Relaxed, + ); + self.me_adaptive_floor_max_warm_writers_per_core + .store( + adaptive_floor_max_warm_writers_per_core as u32, + Ordering::Relaxed, + ); + self.me_adaptive_floor_max_active_writers_global + .store(adaptive_floor_max_active_writers_global, Ordering::Relaxed); + self.me_adaptive_floor_max_warm_writers_global + .store(adaptive_floor_max_warm_writers_global, Ordering::Relaxed); if previous_floor_mode != floor_mode { self.stats.increment_me_floor_mode_switch_total(); match (previous_floor_mode, floor_mode) { @@ -584,11 +634,26 @@ impl MePool { self.proxy_secret.read().await.key_selector } - pub(super) async fn active_writer_count_total(&self) -> usize { + pub(super) async fn non_draining_writer_counts_by_contour(&self) -> (usize, usize, usize) { let ws = self.writers.read().await; - ws.iter() - .filter(|w| !w.draining.load(Ordering::Relaxed)) - .count() + let mut active = 0usize; + let mut warm = 0usize; + for writer in ws.iter() { + if writer.draining.load(Ordering::Relaxed) { + continue; + } + match WriterContour::from_u8(writer.contour.load(Ordering::Relaxed)) { + WriterContour::Active => active = active.saturating_add(1), + WriterContour::Warm => warm = warm.saturating_add(1), + WriterContour::Draining => {} + } + } + (active, warm, active.saturating_add(warm)) + } + + pub(super) async fn active_contour_writer_count_total(&self) -> usize { + let (active, _, _) = self.non_draining_writer_counts_by_contour().await; + active } pub(super) async fn secret_snapshot(&self) -> SecretSnapshot { @@ -634,13 +699,6 @@ impl MePool { .max(1) } - pub(super) fn adaptive_floor_writers_per_core_total(&self) -> usize { - (self - .me_adaptive_floor_writers_per_core_total - .load(Ordering::Relaxed) as usize) - .max(1) - } - pub(super) fn adaptive_floor_max_extra_single_per_core(&self) -> usize { self.me_adaptive_floor_max_extra_writers_single_per_core .load(Ordering::Relaxed) as usize @@ -651,6 +709,34 @@ impl MePool { .load(Ordering::Relaxed) as usize } + pub(super) fn adaptive_floor_max_active_writers_per_core(&self) -> usize { + (self + .me_adaptive_floor_max_active_writers_per_core + .load(Ordering::Relaxed) as usize) + .max(1) + } + + pub(super) fn adaptive_floor_max_warm_writers_per_core(&self) -> usize { + (self + .me_adaptive_floor_max_warm_writers_per_core + .load(Ordering::Relaxed) as usize) + .max(1) + } + + pub(super) fn adaptive_floor_max_active_writers_global(&self) -> usize { + (self + .me_adaptive_floor_max_active_writers_global + .load(Ordering::Relaxed) as usize) + .max(1) + } + + pub(super) fn adaptive_floor_max_warm_writers_global(&self) -> usize { + (self + .me_adaptive_floor_max_warm_writers_global + .load(Ordering::Relaxed) as usize) + .max(1) + } + pub(super) fn adaptive_floor_detected_cpu_cores(&self) -> usize { std::thread::available_parallelism() .map(|value| value.get()) @@ -679,28 +765,126 @@ impl MePool { effective } - pub(super) fn adaptive_floor_global_cap_raw(&self) -> usize { + pub(super) fn adaptive_floor_active_cap_configured_total(&self) -> usize { let cores = self.adaptive_floor_effective_cpu_cores(); - let cap = cores.saturating_mul(self.adaptive_floor_writers_per_core_total()); - self.me_adaptive_floor_global_cap_raw - .store(cap as u64, Ordering::Relaxed); - self.stats.set_me_floor_global_cap_raw_gauge(cap as u64); - cap + let per_core_cap = cores.saturating_mul(self.adaptive_floor_max_active_writers_per_core()); + let configured = per_core_cap.min(self.adaptive_floor_max_active_writers_global()); + self.me_adaptive_floor_active_cap_configured + .store(configured as u64, Ordering::Relaxed); + self.stats + .set_me_floor_active_cap_configured_gauge(configured as u64); + configured + } + + pub(super) fn adaptive_floor_warm_cap_configured_total(&self) -> usize { + let cores = self.adaptive_floor_effective_cpu_cores(); + let per_core_cap = cores.saturating_mul(self.adaptive_floor_max_warm_writers_per_core()); + let configured = per_core_cap.min(self.adaptive_floor_max_warm_writers_global()); + self.me_adaptive_floor_warm_cap_configured + .store(configured as u64, Ordering::Relaxed); + self.stats + .set_me_floor_warm_cap_configured_gauge(configured as u64); + configured } pub(super) fn set_adaptive_floor_runtime_caps( &self, - global_cap_effective: usize, + active_cap_configured: usize, + active_cap_effective: usize, + warm_cap_configured: usize, + warm_cap_effective: usize, target_writers_total: usize, + active_writers_current: usize, + warm_writers_current: usize, ) { + self.me_adaptive_floor_global_cap_raw + .store(active_cap_configured as u64, Ordering::Relaxed); self.me_adaptive_floor_global_cap_effective - .store(global_cap_effective as u64, Ordering::Relaxed); + .store(active_cap_effective as u64, Ordering::Relaxed); self.me_adaptive_floor_target_writers_total .store(target_writers_total as u64, Ordering::Relaxed); + self.me_adaptive_floor_active_cap_configured + .store(active_cap_configured as u64, Ordering::Relaxed); + self.me_adaptive_floor_active_cap_effective + .store(active_cap_effective as u64, Ordering::Relaxed); + self.me_adaptive_floor_warm_cap_configured + .store(warm_cap_configured as u64, Ordering::Relaxed); + self.me_adaptive_floor_warm_cap_effective + .store(warm_cap_effective as u64, Ordering::Relaxed); + self.me_adaptive_floor_active_writers_current + .store(active_writers_current as u64, Ordering::Relaxed); + self.me_adaptive_floor_warm_writers_current + .store(warm_writers_current as u64, Ordering::Relaxed); self.stats - .set_me_floor_global_cap_effective_gauge(global_cap_effective as u64); + .set_me_floor_global_cap_raw_gauge(active_cap_configured as u64); + self.stats + .set_me_floor_global_cap_effective_gauge(active_cap_effective as u64); self.stats .set_me_floor_target_writers_total_gauge(target_writers_total as u64); + self.stats + .set_me_floor_active_cap_configured_gauge(active_cap_configured as u64); + self.stats + .set_me_floor_active_cap_effective_gauge(active_cap_effective as u64); + self.stats + .set_me_floor_warm_cap_configured_gauge(warm_cap_configured as u64); + self.stats + .set_me_floor_warm_cap_effective_gauge(warm_cap_effective as u64); + self.stats + .set_me_writers_active_current_gauge(active_writers_current as u64); + self.stats + .set_me_writers_warm_current_gauge(warm_writers_current as u64); + } + + pub(super) async fn active_coverage_required_total(&self) -> usize { + let mut endpoints_by_dc = HashMap::>::new(); + + if self.decision.ipv4_me { + let map = self.proxy_map_v4.read().await; + for (dc, addrs) in map.iter() { + let entry = endpoints_by_dc.entry(*dc).or_default(); + for (ip, port) in addrs.iter().copied() { + entry.insert(SocketAddr::new(ip, port)); + } + } + } + + if self.decision.ipv6_me { + let map = self.proxy_map_v6.read().await; + for (dc, addrs) in map.iter() { + let entry = endpoints_by_dc.entry(*dc).or_default(); + for (ip, port) in addrs.iter().copied() { + entry.insert(SocketAddr::new(ip, port)); + } + } + } + + endpoints_by_dc + .values() + .map(|endpoints| self.required_writers_for_dc_with_floor_mode(endpoints.len(), false)) + .sum() + } + + pub(super) async fn can_open_writer_for_contour( + &self, + contour: WriterContour, + allow_coverage_override: bool, + ) -> bool { + let (active_writers, warm_writers, _) = self.non_draining_writer_counts_by_contour().await; + match contour { + WriterContour::Active => { + let active_cap = self.adaptive_floor_active_cap_configured_total(); + if active_writers < active_cap { + return true; + } + if !allow_coverage_override { + return false; + } + let coverage_required = self.active_coverage_required_total().await; + active_writers < coverage_required + } + WriterContour::Warm => warm_writers < self.adaptive_floor_warm_cap_configured_total(), + WriterContour::Draining => true, + } } pub(super) fn required_writers_for_dc_with_floor_mode( diff --git a/src/transport/middle_proxy/pool_init.rs b/src/transport/middle_proxy/pool_init.rs index 52cbc68..29a70c5 100644 --- a/src/transport/middle_proxy/pool_init.rs +++ b/src/transport/middle_proxy/pool_init.rs @@ -71,6 +71,7 @@ impl MePool { target_writers, rng_clone, connect_concurrency, + true, ) .await }); @@ -114,6 +115,7 @@ impl MePool { target_writers, rng_clone_local, connect_concurrency, + false, ) .await }); @@ -147,6 +149,7 @@ impl MePool { target_writers: usize, rng: Arc, connect_concurrency: usize, + allow_coverage_override: bool, ) -> bool { if addrs.is_empty() { return false; @@ -180,9 +183,17 @@ impl MePool { let pool = Arc::clone(&self); let rng_clone = Arc::clone(&rng); let endpoints_clone = endpoints.clone(); + let generation = self.current_generation(); join.spawn(async move { - pool.connect_endpoints_round_robin(dc, &endpoints_clone, rng_clone.as_ref()) - .await + pool.connect_endpoints_round_robin_with_generation_contour( + dc, + &endpoints_clone, + rng_clone.as_ref(), + generation, + super::pool::WriterContour::Active, + allow_coverage_override, + ) + .await }); } @@ -212,12 +223,25 @@ impl MePool { return true; } if !progress { - warn!( - dc = %dc, - alive = alive_after, - target_writers, - "All ME servers for DC failed at init" - ); + let active_writers_current = self.active_contour_writer_count_total().await; + let active_cap_configured = self.adaptive_floor_active_cap_configured_total(); + if !allow_coverage_override && active_writers_current >= active_cap_configured { + info!( + dc = %dc, + alive = alive_after, + target_writers, + active_writers_current, + active_cap_configured, + "ME init saturation stopped by active writer cap" + ); + } else { + warn!( + dc = %dc, + alive = alive_after, + target_writers, + "All ME servers for DC failed at init" + ); + } return false; } diff --git a/src/transport/middle_proxy/pool_refill.rs b/src/transport/middle_proxy/pool_refill.rs index 316f3ff..544d048 100644 --- a/src/transport/middle_proxy/pool_refill.rs +++ b/src/transport/middle_proxy/pool_refill.rs @@ -99,6 +99,7 @@ impl MePool { rng, self.current_generation(), WriterContour::Active, + false, ) .await } @@ -110,6 +111,7 @@ impl MePool { rng: &SecureRandom, generation: u64, contour: WriterContour, + allow_coverage_override: bool, ) -> bool { let candidates = self.connectable_endpoints(endpoints).await; if candidates.is_empty() { @@ -120,7 +122,14 @@ impl MePool { let idx = (start + offset) % candidates.len(); let addr = candidates[idx]; match self - .connect_one_with_generation_contour_for_dc(addr, rng, generation, contour, dc) + .connect_one_with_generation_contour_for_dc_with_cap_policy( + addr, + rng, + generation, + contour, + dc, + allow_coverage_override, + ) .await { Ok(()) => return true, diff --git a/src/transport/middle_proxy/pool_reinit.rs b/src/transport/middle_proxy/pool_reinit.rs index 625ccf0..3d9d679 100644 --- a/src/transport/middle_proxy/pool_reinit.rs +++ b/src/transport/middle_proxy/pool_reinit.rs @@ -249,6 +249,7 @@ impl MePool { rng, generation, WriterContour::Warm, + false, ) .await; debug!( diff --git a/src/transport/middle_proxy/pool_status.rs b/src/transport/middle_proxy/pool_status.rs index 2922ed8..cc1be5b 100644 --- a/src/transport/middle_proxy/pool_status.rs +++ b/src/transport/middle_proxy/pool_status.rs @@ -82,11 +82,21 @@ pub(crate) struct MeApiRuntimeSnapshot { pub adaptive_floor_cpu_cores_override: u16, pub adaptive_floor_max_extra_writers_single_per_core: u16, pub adaptive_floor_max_extra_writers_multi_per_core: u16, + pub adaptive_floor_max_active_writers_per_core: u16, + pub adaptive_floor_max_warm_writers_per_core: u16, + pub adaptive_floor_max_active_writers_global: u32, + pub adaptive_floor_max_warm_writers_global: u32, pub adaptive_floor_cpu_cores_detected: u32, pub adaptive_floor_cpu_cores_effective: u32, pub adaptive_floor_global_cap_raw: u64, pub adaptive_floor_global_cap_effective: u64, pub adaptive_floor_target_writers_total: u64, + pub adaptive_floor_active_cap_configured: u64, + pub adaptive_floor_active_cap_effective: u64, + pub adaptive_floor_warm_cap_configured: u64, + pub adaptive_floor_warm_cap_effective: u64, + pub adaptive_floor_active_writers_current: u64, + pub adaptive_floor_warm_writers_current: u64, pub me_keepalive_enabled: bool, pub me_keepalive_interval_secs: u64, pub me_keepalive_jitter_secs: u64, @@ -430,6 +440,18 @@ impl MePool { adaptive_floor_max_extra_writers_multi_per_core: self .me_adaptive_floor_max_extra_writers_multi_per_core .load(Ordering::Relaxed) as u16, + adaptive_floor_max_active_writers_per_core: self + .me_adaptive_floor_max_active_writers_per_core + .load(Ordering::Relaxed) as u16, + adaptive_floor_max_warm_writers_per_core: self + .me_adaptive_floor_max_warm_writers_per_core + .load(Ordering::Relaxed) as u16, + adaptive_floor_max_active_writers_global: self + .me_adaptive_floor_max_active_writers_global + .load(Ordering::Relaxed), + adaptive_floor_max_warm_writers_global: self + .me_adaptive_floor_max_warm_writers_global + .load(Ordering::Relaxed), adaptive_floor_cpu_cores_detected: self .me_adaptive_floor_cpu_cores_detected .load(Ordering::Relaxed), @@ -445,6 +467,24 @@ impl MePool { adaptive_floor_target_writers_total: self .me_adaptive_floor_target_writers_total .load(Ordering::Relaxed), + adaptive_floor_active_cap_configured: self + .me_adaptive_floor_active_cap_configured + .load(Ordering::Relaxed), + adaptive_floor_active_cap_effective: self + .me_adaptive_floor_active_cap_effective + .load(Ordering::Relaxed), + adaptive_floor_warm_cap_configured: self + .me_adaptive_floor_warm_cap_configured + .load(Ordering::Relaxed), + adaptive_floor_warm_cap_effective: self + .me_adaptive_floor_warm_cap_effective + .load(Ordering::Relaxed), + adaptive_floor_active_writers_current: self + .me_adaptive_floor_active_writers_current + .load(Ordering::Relaxed), + adaptive_floor_warm_writers_current: self + .me_adaptive_floor_warm_writers_current + .load(Ordering::Relaxed), me_keepalive_enabled: self.me_keepalive_enabled, me_keepalive_interval_secs: self.me_keepalive_interval.as_secs(), me_keepalive_jitter_secs: self.me_keepalive_jitter.as_secs(), diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index 244a08e..90f8d0a 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -86,6 +86,35 @@ impl MePool { contour: WriterContour, writer_dc: i32, ) -> Result<()> { + self.connect_one_with_generation_contour_for_dc_with_cap_policy( + addr, + rng, + generation, + contour, + writer_dc, + false, + ) + .await + } + + pub(super) async fn connect_one_with_generation_contour_for_dc_with_cap_policy( + self: &Arc, + addr: SocketAddr, + rng: &SecureRandom, + generation: u64, + contour: WriterContour, + writer_dc: i32, + allow_coverage_override: bool, + ) -> Result<()> { + if !self + .can_open_writer_for_contour(contour, allow_coverage_override) + .await + { + return Err(ProxyError::Proxy(format!( + "ME {contour:?} writer cap reached" + ))); + } + let secret_len = self.proxy_secret.read().await.secret.len(); if secret_len < 32 { return Err(ProxyError::Proxy("proxy-secret too short for ME auth".into()));