diff --git a/src/api/mod.rs b/src/api/mod.rs index e63de29..63fafad 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -16,6 +16,7 @@ use tracing::{debug, info, warn}; use crate::config::ProxyConfig; use crate::ip_tracker::UserIpTracker; +use crate::startup::StartupTracker; use crate::stats::Stats; use crate::transport::middle_proxy::MePool; use crate::transport::UpstreamManager; @@ -25,6 +26,7 @@ mod events; mod http_utils; mod model; mod runtime_edge; +mod runtime_init; mod runtime_min; mod runtime_stats; mod runtime_watch; @@ -41,6 +43,7 @@ use runtime_edge::{ EdgeConnectionsCacheEntry, build_runtime_connections_summary_data, build_runtime_events_recent_data, }; +use runtime_init::build_runtime_initialization_data; use runtime_min::{ build_runtime_me_pool_state_data, build_runtime_me_quality_data, build_runtime_nat_stun_data, build_runtime_upstream_quality_data, build_security_whitelist_data, @@ -79,6 +82,7 @@ pub(super) struct ApiShared { pub(super) runtime_events: Arc, pub(super) request_id: Arc, pub(super) runtime_state: Arc, + pub(super) startup_tracker: Arc, } impl ApiShared { @@ -99,6 +103,7 @@ pub async fn serve( startup_detected_ip_v4: Option, startup_detected_ip_v6: Option, process_started_at_epoch_secs: u64, + startup_tracker: Arc, ) { let listener = match TcpListener::bind(listen).await { Ok(listener) => listener, @@ -138,6 +143,7 @@ pub async fn serve( )), request_id: Arc::new(AtomicU64::new(1)), runtime_state: runtime_state.clone(), + startup_tracker, }); spawn_runtime_watchers( @@ -251,6 +257,11 @@ async fn handle( let data = build_runtime_gates_data(shared.as_ref(), cfg.as_ref()).await; Ok(success_response(StatusCode::OK, data, revision)) } + ("GET", "/v1/runtime/initialization") => { + let revision = current_revision(&shared.config_path).await?; + let data = build_runtime_initialization_data(shared.as_ref()).await; + Ok(success_response(StatusCode::OK, data, revision)) + } ("GET", "/v1/limits/effective") => { let revision = current_revision(&shared.config_path).await?; let data = build_limits_effective_data(cfg.as_ref()); diff --git a/src/api/runtime_init.rs b/src/api/runtime_init.rs new file mode 100644 index 0000000..4bd8943 --- /dev/null +++ b/src/api/runtime_init.rs @@ -0,0 +1,186 @@ +use serde::Serialize; + +use crate::startup::{ + COMPONENT_ME_CONNECTIVITY_PING, COMPONENT_ME_POOL_CONSTRUCT, COMPONENT_ME_POOL_INIT_STAGE1, + COMPONENT_ME_PROXY_CONFIG_V4, COMPONENT_ME_PROXY_CONFIG_V6, COMPONENT_ME_SECRET_FETCH, + StartupComponentStatus, StartupMeStatus, compute_progress_pct, +}; + +use super::ApiShared; + +#[derive(Serialize)] +pub(super) struct RuntimeInitializationComponentData { + pub(super) id: &'static str, + pub(super) title: &'static str, + pub(super) status: &'static str, + pub(super) started_at_epoch_ms: Option, + pub(super) finished_at_epoch_ms: Option, + pub(super) duration_ms: Option, + pub(super) attempts: u32, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) details: Option, +} + +#[derive(Serialize)] +pub(super) struct RuntimeInitializationMeData { + pub(super) status: &'static str, + pub(super) current_stage: String, + pub(super) progress_pct: f64, + pub(super) init_attempt: u32, + pub(super) retry_limit: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) last_error: Option, +} + +#[derive(Serialize)] +pub(super) struct RuntimeInitializationData { + pub(super) status: &'static str, + pub(super) degraded: bool, + pub(super) current_stage: String, + pub(super) progress_pct: f64, + pub(super) started_at_epoch_secs: u64, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) ready_at_epoch_secs: Option, + pub(super) total_elapsed_ms: u64, + pub(super) transport_mode: String, + pub(super) me: RuntimeInitializationMeData, + pub(super) components: Vec, +} + +#[derive(Clone)] +pub(super) struct RuntimeStartupSummaryData { + pub(super) status: &'static str, + pub(super) stage: String, + pub(super) progress_pct: f64, +} + +pub(super) async fn build_runtime_startup_summary(shared: &ApiShared) -> RuntimeStartupSummaryData { + let snapshot = shared.startup_tracker.snapshot().await; + let me_pool_progress = current_me_pool_stage_progress(shared).await; + let progress_pct = compute_progress_pct(&snapshot, me_pool_progress); + RuntimeStartupSummaryData { + status: snapshot.status.as_str(), + stage: snapshot.current_stage, + progress_pct, + } +} + +pub(super) async fn build_runtime_initialization_data( + shared: &ApiShared, +) -> RuntimeInitializationData { + let snapshot = shared.startup_tracker.snapshot().await; + let me_pool_progress = current_me_pool_stage_progress(shared).await; + let progress_pct = compute_progress_pct(&snapshot, me_pool_progress); + let me_progress_pct = compute_me_progress_pct(&snapshot, me_pool_progress); + + RuntimeInitializationData { + status: snapshot.status.as_str(), + degraded: snapshot.degraded, + current_stage: snapshot.current_stage, + progress_pct, + started_at_epoch_secs: snapshot.started_at_epoch_secs, + ready_at_epoch_secs: snapshot.ready_at_epoch_secs, + total_elapsed_ms: snapshot.total_elapsed_ms, + transport_mode: snapshot.transport_mode, + me: RuntimeInitializationMeData { + status: snapshot.me.status.as_str(), + current_stage: snapshot.me.current_stage, + progress_pct: me_progress_pct, + init_attempt: snapshot.me.init_attempt, + retry_limit: snapshot.me.retry_limit, + last_error: snapshot.me.last_error, + }, + components: snapshot + .components + .into_iter() + .map(|component| RuntimeInitializationComponentData { + id: component.id, + title: component.title, + status: component.status.as_str(), + started_at_epoch_ms: component.started_at_epoch_ms, + finished_at_epoch_ms: component.finished_at_epoch_ms, + duration_ms: component.duration_ms, + attempts: component.attempts, + details: component.details, + }) + .collect(), + } +} + +fn compute_me_progress_pct( + snapshot: &crate::startup::StartupSnapshot, + me_pool_progress: Option, +) -> f64 { + match snapshot.me.status { + StartupMeStatus::Pending => 0.0, + StartupMeStatus::Ready | StartupMeStatus::Failed | StartupMeStatus::Skipped => 100.0, + StartupMeStatus::Initializing => { + let mut total_weight = 0.0f64; + let mut completed_weight = 0.0f64; + for component in &snapshot.components { + if !is_me_component(component.id) { + continue; + } + total_weight += component.weight; + let unit_progress = match component.status { + StartupComponentStatus::Pending => 0.0, + StartupComponentStatus::Running => { + if component.id == COMPONENT_ME_POOL_INIT_STAGE1 { + me_pool_progress.unwrap_or(0.0).clamp(0.0, 1.0) + } else { + 0.0 + } + } + StartupComponentStatus::Ready + | StartupComponentStatus::Failed + | StartupComponentStatus::Skipped => 1.0, + }; + completed_weight += component.weight * unit_progress; + } + if total_weight <= f64::EPSILON { + 0.0 + } else { + ((completed_weight / total_weight) * 100.0).clamp(0.0, 100.0) + } + } + } +} + +fn is_me_component(component_id: &str) -> bool { + matches!( + component_id, + COMPONENT_ME_SECRET_FETCH + | COMPONENT_ME_PROXY_CONFIG_V4 + | COMPONENT_ME_PROXY_CONFIG_V6 + | COMPONENT_ME_POOL_CONSTRUCT + | COMPONENT_ME_POOL_INIT_STAGE1 + | COMPONENT_ME_CONNECTIVITY_PING + ) +} + +async fn current_me_pool_stage_progress(shared: &ApiShared) -> Option { + let snapshot = shared.startup_tracker.snapshot().await; + if snapshot.me.status != StartupMeStatus::Initializing { + return None; + } + + let pool = shared.me_pool.read().await.clone()?; + let status = pool.api_status_snapshot().await; + let configured_dc_groups = status.configured_dc_groups; + let covered_dc_groups = status + .dcs + .iter() + .filter(|dc| dc.alive_writers > 0) + .count(); + + let dc_coverage = ratio_01(covered_dc_groups, configured_dc_groups); + let writer_coverage = ratio_01(status.alive_writers, status.required_writers); + Some((0.7 * dc_coverage + 0.3 * writer_coverage).clamp(0.0, 1.0)) +} + +fn ratio_01(part: usize, total: usize) -> f64 { + if total == 0 { + return 0.0; + } + ((part as f64) / (total as f64)).clamp(0.0, 1.0) +} diff --git a/src/api/runtime_zero.rs b/src/api/runtime_zero.rs index e184671..8def2a9 100644 --- a/src/api/runtime_zero.rs +++ b/src/api/runtime_zero.rs @@ -5,6 +5,7 @@ use serde::Serialize; use crate::config::{MeFloorMode, ProxyConfig, UserMaxUniqueIpsMode}; use super::ApiShared; +use super::runtime_init::build_runtime_startup_summary; #[derive(Serialize)] pub(super) struct SystemInfoData { @@ -34,6 +35,9 @@ pub(super) struct RuntimeGatesData { pub(super) me_runtime_ready: bool, pub(super) me2dc_fallback_enabled: bool, pub(super) use_middle_proxy: bool, + pub(super) startup_status: &'static str, + pub(super) startup_stage: String, + pub(super) startup_progress_pct: f64, } #[derive(Serialize)] @@ -146,6 +150,7 @@ pub(super) async fn build_runtime_gates_data( shared: &ApiShared, cfg: &ProxyConfig, ) -> RuntimeGatesData { + let startup_summary = build_runtime_startup_summary(shared).await; let me_runtime_ready = if !cfg.general.use_middle_proxy { true } else { @@ -164,6 +169,9 @@ pub(super) async fn build_runtime_gates_data( me_runtime_ready, me2dc_fallback_enabled: cfg.general.me2dc_fallback, use_middle_proxy: cfg.general.use_middle_proxy, + startup_status: startup_summary.status, + startup_stage: startup_summary.stage, + startup_progress_pct: startup_summary.progress_pct, } } diff --git a/src/main.rs b/src/main.rs index a46be25..cc6f8ed 100644 --- a/src/main.rs +++ b/src/main.rs @@ -26,6 +26,7 @@ mod protocol; mod proxy; mod stats; mod stream; +mod startup; mod transport; mod tls_front; mod util; @@ -39,6 +40,14 @@ use crate::proxy::ClientHandler; use crate::stats::beobachten::BeobachtenStore; use crate::stats::telemetry::TelemetryPolicy; use crate::stats::{ReplayChecker, Stats}; +use crate::startup::{ + COMPONENT_API_BOOTSTRAP, COMPONENT_CONFIG_LOAD, COMPONENT_CONFIG_WATCHER_START, + COMPONENT_DC_CONNECTIVITY_PING, COMPONENT_LISTENERS_BIND, COMPONENT_ME_CONNECTIVITY_PING, + COMPONENT_ME_POOL_CONSTRUCT, COMPONENT_ME_POOL_INIT_STAGE1, COMPONENT_ME_PROXY_CONFIG_V4, + COMPONENT_ME_PROXY_CONFIG_V6, COMPONENT_ME_SECRET_FETCH, COMPONENT_METRICS_START, + COMPONENT_NETWORK_PROBE, COMPONENT_RUNTIME_READY, COMPONENT_TLS_FRONT_BOOTSTRAP, + COMPONENT_TRACING_INIT, StartupMeStatus, StartupTracker, +}; use crate::stream::BufferPool; use crate::transport::middle_proxy::{ MePool, ProxyConfigData, fetch_proxy_config_with_raw, format_me_route, format_sample_line, @@ -373,6 +382,10 @@ async fn main() -> std::result::Result<(), Box> { .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_secs(); + let startup_tracker = Arc::new(StartupTracker::new(process_started_at_epoch_secs)); + startup_tracker + .start_component(COMPONENT_CONFIG_LOAD, Some("load and validate config".to_string())) + .await; let (config_path, cli_silent, cli_log_level) = parse_cli(); let mut config = match ProxyConfig::load(&config_path) { @@ -399,6 +412,9 @@ async fn main() -> std::result::Result<(), Box> { eprintln!("[telemt] Invalid network.dns_overrides: {}", e); std::process::exit(1); } + startup_tracker + .complete_component(COMPONENT_CONFIG_LOAD, Some("config is ready".to_string())) + .await; let has_rust_log = std::env::var("RUST_LOG").is_ok(); let effective_log_level = if cli_silent { @@ -410,6 +426,9 @@ async fn main() -> std::result::Result<(), Box> { }; let (filter_layer, filter_handle) = reload::Layer::new(EnvFilter::new("info")); + startup_tracker + .start_component(COMPONENT_TRACING_INIT, Some("initialize tracing subscriber".to_string())) + .await; // Configure color output based on config let fmt_layer = if config.general.disable_colors { @@ -422,6 +441,9 @@ async fn main() -> std::result::Result<(), Box> { .with(filter_layer) .with(fmt_layer) .init(); + startup_tracker + .complete_component(COMPONENT_TRACING_INIT, Some("tracing initialized".to_string())) + .await; info!("Telemt MTProxy v{}", env!("CARGO_PKG_VERSION")); info!("Log level: {}", effective_log_level); @@ -498,6 +520,9 @@ async fn main() -> std::result::Result<(), Box> { let initial_admission_open = !config.general.use_middle_proxy; let (admission_tx, admission_rx) = watch::channel(initial_admission_open); let api_me_pool = Arc::new(RwLock::new(None::>)); + startup_tracker + .start_component(COMPONENT_API_BOOTSTRAP, Some("spawn API listener task".to_string())) + .await; if config.server.api.enabled { let listen = match config.server.api.listen.parse::() { @@ -519,6 +544,7 @@ async fn main() -> std::result::Result<(), Box> { let config_rx_api = api_config_rx.clone(); let admission_rx_api = admission_rx.clone(); let config_path_api = std::path::PathBuf::from(&config_path); + let startup_tracker_api = startup_tracker.clone(); tokio::spawn(async move { api::serve( listen, @@ -532,10 +558,31 @@ async fn main() -> std::result::Result<(), Box> { None, None, process_started_at_epoch_secs, + startup_tracker_api, ) .await; }); + startup_tracker + .complete_component( + COMPONENT_API_BOOTSTRAP, + Some(format!("api task spawned on {}", listen)), + ) + .await; + } else { + startup_tracker + .skip_component( + COMPONENT_API_BOOTSTRAP, + Some("server.api.listen has zero port".to_string()), + ) + .await; } + } else { + startup_tracker + .skip_component( + COMPONENT_API_BOOTSTRAP, + Some("server.api.enabled is false".to_string()), + ) + .await; } let mut tls_domains = Vec::with_capacity(1 + config.censorship.tls_domains.len()); @@ -547,6 +594,12 @@ async fn main() -> std::result::Result<(), Box> { } // Start TLS front fetching in background immediately, in parallel with STUN probing. + startup_tracker + .start_component( + COMPONENT_TLS_FRONT_BOOTSTRAP, + Some("initialize TLS front cache/bootstrap tasks".to_string()), + ) + .await; let tls_cache: Option> = if config.censorship.tls_emulation { let cache = Arc::new(TlsFrontCache::new( &tls_domains, @@ -667,9 +720,26 @@ async fn main() -> std::result::Result<(), Box> { Some(cache) } else { + startup_tracker + .skip_component( + COMPONENT_TLS_FRONT_BOOTSTRAP, + Some("censorship.tls_emulation is false".to_string()), + ) + .await; None }; + if tls_cache.is_some() { + startup_tracker + .complete_component( + COMPONENT_TLS_FRONT_BOOTSTRAP, + Some("tls front cache is initialized".to_string()), + ) + .await; + } + startup_tracker + .start_component(COMPONENT_NETWORK_PROBE, Some("probe network capabilities".to_string())) + .await; let probe = run_probe( &config.network, config.general.middle_proxy_nat_probe, @@ -678,6 +748,12 @@ async fn main() -> std::result::Result<(), Box> { .await?; let decision = decide_network_capabilities(&config.network, &probe); log_probe_result(&probe, &decision); + startup_tracker + .complete_component( + COMPONENT_NETWORK_PROBE, + Some("network capabilities determined".to_string()), + ) + .await; let prefer_ipv6 = decision.prefer_ipv6(); let mut use_middle_proxy = config.general.use_middle_proxy; @@ -701,6 +777,59 @@ async fn main() -> std::result::Result<(), Box> { } } + if use_middle_proxy { + startup_tracker + .set_me_status(StartupMeStatus::Initializing, COMPONENT_ME_SECRET_FETCH) + .await; + startup_tracker + .start_component( + COMPONENT_ME_SECRET_FETCH, + Some("fetch proxy-secret from source/cache".to_string()), + ) + .await; + startup_tracker + .set_me_retry_limit(if !me2dc_fallback || me_init_retry_attempts == 0 { + "unlimited".to_string() + } else { + me_init_retry_attempts.to_string() + }) + .await; + } else { + startup_tracker + .set_me_status(StartupMeStatus::Skipped, "skipped") + .await; + startup_tracker + .skip_component( + COMPONENT_ME_SECRET_FETCH, + Some("middle proxy mode disabled".to_string()), + ) + .await; + startup_tracker + .skip_component( + COMPONENT_ME_PROXY_CONFIG_V4, + Some("middle proxy mode disabled".to_string()), + ) + .await; + startup_tracker + .skip_component( + COMPONENT_ME_PROXY_CONFIG_V6, + Some("middle proxy mode disabled".to_string()), + ) + .await; + startup_tracker + .skip_component( + COMPONENT_ME_POOL_CONSTRUCT, + Some("middle proxy mode disabled".to_string()), + ) + .await; + startup_tracker + .skip_component( + COMPONENT_ME_POOL_INIT_STAGE1, + Some("middle proxy mode disabled".to_string()), + ) + .await; + } + // ===================================================================== // Middle Proxy initialization (if enabled) // ===================================================================== @@ -738,6 +867,9 @@ async fn main() -> std::result::Result<(), Box> { { Ok(proxy_secret) => break Some(proxy_secret), Err(e) => { + startup_tracker + .set_me_last_error(Some(e.to_string())) + .await; if me2dc_fallback { error!( error = %e, @@ -757,6 +889,12 @@ async fn main() -> std::result::Result<(), Box> { }; match proxy_secret { Some(proxy_secret) => { + startup_tracker + .complete_component( + COMPONENT_ME_SECRET_FETCH, + Some("proxy-secret loaded".to_string()), + ) + .await; info!( secret_len = proxy_secret.len(), key_sig = format_args!( @@ -775,6 +913,15 @@ async fn main() -> std::result::Result<(), Box> { "Proxy-secret loaded" ); + startup_tracker + .start_component( + COMPONENT_ME_PROXY_CONFIG_V4, + Some("load startup proxy-config v4".to_string()), + ) + .await; + startup_tracker + .set_me_status(StartupMeStatus::Initializing, COMPONENT_ME_PROXY_CONFIG_V4) + .await; let cfg_v4 = load_startup_proxy_config_snapshot( "https://core.telegram.org/getProxyConfig", config.general.proxy_config_v4_cache_path.as_deref(), @@ -782,6 +929,30 @@ async fn main() -> std::result::Result<(), Box> { "getProxyConfig", ) .await; + if cfg_v4.is_some() { + startup_tracker + .complete_component( + COMPONENT_ME_PROXY_CONFIG_V4, + Some("proxy-config v4 loaded".to_string()), + ) + .await; + } else { + startup_tracker + .fail_component( + COMPONENT_ME_PROXY_CONFIG_V4, + Some("proxy-config v4 unavailable".to_string()), + ) + .await; + } + startup_tracker + .start_component( + COMPONENT_ME_PROXY_CONFIG_V6, + Some("load startup proxy-config v6".to_string()), + ) + .await; + startup_tracker + .set_me_status(StartupMeStatus::Initializing, COMPONENT_ME_PROXY_CONFIG_V6) + .await; let cfg_v6 = load_startup_proxy_config_snapshot( "https://core.telegram.org/getProxyConfigV6", config.general.proxy_config_v6_cache_path.as_deref(), @@ -789,8 +960,32 @@ async fn main() -> std::result::Result<(), Box> { "getProxyConfigV6", ) .await; + if cfg_v6.is_some() { + startup_tracker + .complete_component( + COMPONENT_ME_PROXY_CONFIG_V6, + Some("proxy-config v6 loaded".to_string()), + ) + .await; + } else { + startup_tracker + .fail_component( + COMPONENT_ME_PROXY_CONFIG_V6, + Some("proxy-config v6 unavailable".to_string()), + ) + .await; + } if let (Some(cfg_v4), Some(cfg_v6)) = (cfg_v4, cfg_v6) { + startup_tracker + .start_component( + COMPONENT_ME_POOL_CONSTRUCT, + Some("construct ME pool".to_string()), + ) + .await; + startup_tracker + .set_me_status(StartupMeStatus::Initializing, COMPONENT_ME_POOL_CONSTRUCT) + .await; let pool = MePool::new( proxy_tag.clone(), proxy_secret, @@ -857,12 +1052,44 @@ async fn main() -> std::result::Result<(), Box> { config.general.me_route_inline_recovery_attempts, config.general.me_route_inline_recovery_wait_ms, ); + startup_tracker + .complete_component( + COMPONENT_ME_POOL_CONSTRUCT, + Some("ME pool object created".to_string()), + ) + .await; + *api_me_pool.write().await = Some(pool.clone()); + startup_tracker + .start_component( + COMPONENT_ME_POOL_INIT_STAGE1, + Some("initialize ME pool writers".to_string()), + ) + .await; + startup_tracker + .set_me_status( + StartupMeStatus::Initializing, + COMPONENT_ME_POOL_INIT_STAGE1, + ) + .await; let mut init_attempt: u32 = 0; loop { init_attempt = init_attempt.saturating_add(1); + startup_tracker.set_me_init_attempt(init_attempt).await; match pool.init(pool_size, &rng).await { Ok(()) => { + startup_tracker + .set_me_last_error(None) + .await; + startup_tracker + .complete_component( + COMPONENT_ME_POOL_INIT_STAGE1, + Some("ME pool initialized".to_string()), + ) + .await; + startup_tracker + .set_me_status(StartupMeStatus::Ready, "ready") + .await; info!( attempt = init_attempt, "Middle-End pool initialized successfully" @@ -882,8 +1109,20 @@ async fn main() -> std::result::Result<(), Box> { break Some(pool); } Err(e) => { + startup_tracker + .set_me_last_error(Some(e.to_string())) + .await; let retries_limited = me2dc_fallback && me_init_retry_attempts > 0; if retries_limited && init_attempt >= me_init_retry_attempts { + startup_tracker + .fail_component( + COMPONENT_ME_POOL_INIT_STAGE1, + Some("ME init retry budget exhausted".to_string()), + ) + .await; + startup_tracker + .set_me_status(StartupMeStatus::Failed, "failed") + .await; error!( error = %e, attempt = init_attempt, @@ -923,10 +1162,60 @@ async fn main() -> std::result::Result<(), Box> { } } } else { + startup_tracker + .skip_component( + COMPONENT_ME_POOL_CONSTRUCT, + Some("ME configs are incomplete".to_string()), + ) + .await; + startup_tracker + .fail_component( + COMPONENT_ME_POOL_INIT_STAGE1, + Some("ME configs are incomplete".to_string()), + ) + .await; + startup_tracker + .set_me_status(StartupMeStatus::Failed, "failed") + .await; None } } - None => None, + None => { + startup_tracker + .fail_component( + COMPONENT_ME_SECRET_FETCH, + Some("proxy-secret unavailable".to_string()), + ) + .await; + startup_tracker + .skip_component( + COMPONENT_ME_PROXY_CONFIG_V4, + Some("proxy-secret unavailable".to_string()), + ) + .await; + startup_tracker + .skip_component( + COMPONENT_ME_PROXY_CONFIG_V6, + Some("proxy-secret unavailable".to_string()), + ) + .await; + startup_tracker + .skip_component( + COMPONENT_ME_POOL_CONSTRUCT, + Some("proxy-secret unavailable".to_string()), + ) + .await; + startup_tracker + .fail_component( + COMPONENT_ME_POOL_INIT_STAGE1, + Some("proxy-secret unavailable".to_string()), + ) + .await; + startup_tracker + .set_me_status(StartupMeStatus::Failed, "failed") + .await; + None + } } } else { None @@ -934,12 +1223,33 @@ async fn main() -> std::result::Result<(), Box> { // If ME failed to initialize, force direct-only mode. if me_pool.is_some() { + startup_tracker + .set_transport_mode("middle_proxy") + .await; + startup_tracker + .set_degraded(false) + .await; info!("Transport: Middle-End Proxy - all DC-over-RPC"); } else { let _ = use_middle_proxy; use_middle_proxy = false; // Make runtime config reflect direct-only mode for handlers. config.general.use_middle_proxy = false; + startup_tracker + .set_transport_mode("direct") + .await; + startup_tracker + .set_degraded(true) + .await; + if me2dc_fallback { + startup_tracker + .set_me_status(StartupMeStatus::Failed, "fallback_to_direct") + .await; + } else { + startup_tracker + .set_me_status(StartupMeStatus::Skipped, "skipped") + .await; + } info!("Transport: Direct DC - TCP - standard DC-over-TCP"); } @@ -954,6 +1264,21 @@ async fn main() -> std::result::Result<(), Box> { let buffer_pool = Arc::new(BufferPool::with_config(16 * 1024, 4096)); // Middle-End ping before DC connectivity + if me_pool.is_some() { + startup_tracker + .start_component( + COMPONENT_ME_CONNECTIVITY_PING, + Some("run startup ME connectivity check".to_string()), + ) + .await; + } else { + startup_tracker + .skip_component( + COMPONENT_ME_CONNECTIVITY_PING, + Some("ME pool is not available".to_string()), + ) + .await; + } if let Some(ref pool) = me_pool { let me_results = run_me_ping(pool, &rng).await; @@ -1023,9 +1348,21 @@ async fn main() -> std::result::Result<(), Box> { } } info!("============================================================"); + startup_tracker + .complete_component( + COMPONENT_ME_CONNECTIVITY_PING, + Some("startup ME connectivity check completed".to_string()), + ) + .await; } info!("================= Telegram DC Connectivity ================="); + startup_tracker + .start_component( + COMPONENT_DC_CONNECTIVITY_PING, + Some("run startup DC connectivity check".to_string()), + ) + .await; let ping_results = upstream_manager .ping_all_dcs( @@ -1105,9 +1442,21 @@ async fn main() -> std::result::Result<(), Box> { info!("============================================================"); } } + startup_tracker + .complete_component( + COMPONENT_DC_CONNECTIVITY_PING, + Some("startup DC connectivity check completed".to_string()), + ) + .await; let initialized_secs = process_started_at.elapsed().as_secs(); let second_suffix = if initialized_secs == 1 { "" } else { "s" }; + startup_tracker + .start_component( + COMPONENT_RUNTIME_READY, + Some("finalize startup runtime state".to_string()), + ) + .await; info!("===================== Telegram Startup ====================="); info!( " DC/ME Initialized in {} second{}", @@ -1150,6 +1499,12 @@ async fn main() -> std::result::Result<(), Box> { // ── Hot-reload watcher ──────────────────────────────────────────────── // Uses inotify to detect file changes instantly (SIGHUP also works). // detected_ip_v4/v6 are passed so newly added users get correct TG links. + startup_tracker + .start_component( + COMPONENT_CONFIG_WATCHER_START, + Some("spawn config hot-reload watcher".to_string()), + ) + .await; let (config_rx, mut log_level_rx): ( tokio::sync::watch::Receiver>, tokio::sync::watch::Receiver, @@ -1159,6 +1514,12 @@ async fn main() -> std::result::Result<(), Box> { detected_ip_v4, detected_ip_v6, ); + startup_tracker + .complete_component( + COMPONENT_CONFIG_WATCHER_START, + Some("config hot-reload watcher started".to_string()), + ) + .await; let mut config_rx_api_bridge = config_rx.clone(); let api_config_tx_bridge = api_config_tx.clone(); tokio::spawn(async move { @@ -1300,6 +1661,12 @@ async fn main() -> std::result::Result<(), Box> { }); } + startup_tracker + .start_component( + COMPONENT_LISTENERS_BIND, + Some("bind TCP/Unix listeners".to_string()), + ) + .await; let mut listeners = Vec::new(); for listener_conf in &config.server.listeners { @@ -1550,6 +1917,16 @@ async fn main() -> std::result::Result<(), Box> { } }); } + startup_tracker + .complete_component( + COMPONENT_LISTENERS_BIND, + Some(format!( + "listeners configured tcp={} unix={}", + listeners.len(), + has_unix_listener + )), + ) + .await; if listeners.is_empty() && !has_unix_listener { error!("No listeners. Exiting."); @@ -1583,6 +1960,12 @@ async fn main() -> std::result::Result<(), Box> { }); if let Some(port) = config.server.metrics_port { + startup_tracker + .start_component( + COMPONENT_METRICS_START, + Some(format!("spawn metrics endpoint on {}", port)), + ) + .await; let stats = stats.clone(); let beobachten = beobachten.clone(); let config_rx_metrics = config_rx.clone(); @@ -1599,8 +1982,29 @@ async fn main() -> std::result::Result<(), Box> { ) .await; }); + startup_tracker + .complete_component( + COMPONENT_METRICS_START, + Some("metrics task spawned".to_string()), + ) + .await; + } else { + startup_tracker + .skip_component( + COMPONENT_METRICS_START, + Some("server.metrics_port is not configured".to_string()), + ) + .await; } + startup_tracker + .complete_component( + COMPONENT_RUNTIME_READY, + Some("startup pipeline is fully initialized".to_string()), + ) + .await; + startup_tracker.mark_ready().await; + for (listener, listener_proxy_protocol) in listeners { let mut config_rx: tokio::sync::watch::Receiver> = config_rx.clone(); let mut admission_rx_tcp = admission_rx.clone(); diff --git a/src/startup.rs b/src/startup.rs new file mode 100644 index 0000000..f6f857c --- /dev/null +++ b/src/startup.rs @@ -0,0 +1,373 @@ +use std::time::{Instant, SystemTime, UNIX_EPOCH}; + +use tokio::sync::RwLock; + +pub const COMPONENT_CONFIG_LOAD: &str = "config_load"; +pub const COMPONENT_TRACING_INIT: &str = "tracing_init"; +pub const COMPONENT_API_BOOTSTRAP: &str = "api_bootstrap"; +pub const COMPONENT_TLS_FRONT_BOOTSTRAP: &str = "tls_front_bootstrap"; +pub const COMPONENT_NETWORK_PROBE: &str = "network_probe"; +pub const COMPONENT_ME_SECRET_FETCH: &str = "me_secret_fetch"; +pub const COMPONENT_ME_PROXY_CONFIG_V4: &str = "me_proxy_config_fetch_v4"; +pub const COMPONENT_ME_PROXY_CONFIG_V6: &str = "me_proxy_config_fetch_v6"; +pub const COMPONENT_ME_POOL_CONSTRUCT: &str = "me_pool_construct"; +pub const COMPONENT_ME_POOL_INIT_STAGE1: &str = "me_pool_init_stage1"; +pub const COMPONENT_ME_CONNECTIVITY_PING: &str = "me_connectivity_ping"; +pub const COMPONENT_DC_CONNECTIVITY_PING: &str = "dc_connectivity_ping"; +pub const COMPONENT_LISTENERS_BIND: &str = "listeners_bind"; +pub const COMPONENT_CONFIG_WATCHER_START: &str = "config_watcher_start"; +pub const COMPONENT_METRICS_START: &str = "metrics_start"; +pub const COMPONENT_RUNTIME_READY: &str = "runtime_ready"; + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum StartupStatus { + Initializing, + Ready, +} + +impl StartupStatus { + pub fn as_str(self) -> &'static str { + match self { + Self::Initializing => "initializing", + Self::Ready => "ready", + } + } +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum StartupComponentStatus { + Pending, + Running, + Ready, + Failed, + Skipped, +} + +impl StartupComponentStatus { + pub fn as_str(self) -> &'static str { + match self { + Self::Pending => "pending", + Self::Running => "running", + Self::Ready => "ready", + Self::Failed => "failed", + Self::Skipped => "skipped", + } + } +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum StartupMeStatus { + Pending, + Initializing, + Ready, + Failed, + Skipped, +} + +impl StartupMeStatus { + pub fn as_str(self) -> &'static str { + match self { + Self::Pending => "pending", + Self::Initializing => "initializing", + Self::Ready => "ready", + Self::Failed => "failed", + Self::Skipped => "skipped", + } + } +} + +#[derive(Clone, Debug)] +pub struct StartupComponentSnapshot { + pub id: &'static str, + pub title: &'static str, + pub weight: f64, + pub status: StartupComponentStatus, + pub started_at_epoch_ms: Option, + pub finished_at_epoch_ms: Option, + pub duration_ms: Option, + pub attempts: u32, + pub details: Option, +} + +#[derive(Clone, Debug)] +pub struct StartupMeSnapshot { + pub status: StartupMeStatus, + pub current_stage: String, + pub init_attempt: u32, + pub retry_limit: String, + pub last_error: Option, +} + +#[derive(Clone, Debug)] +pub struct StartupSnapshot { + pub status: StartupStatus, + pub degraded: bool, + pub current_stage: String, + pub started_at_epoch_secs: u64, + pub ready_at_epoch_secs: Option, + pub total_elapsed_ms: u64, + pub transport_mode: String, + pub me: StartupMeSnapshot, + pub components: Vec, +} + +#[derive(Clone, Debug)] +struct StartupComponent { + id: &'static str, + title: &'static str, + weight: f64, + status: StartupComponentStatus, + started_at_epoch_ms: Option, + finished_at_epoch_ms: Option, + duration_ms: Option, + attempts: u32, + details: Option, +} + +#[derive(Clone, Debug)] +struct StartupState { + status: StartupStatus, + degraded: bool, + current_stage: String, + started_at_epoch_secs: u64, + ready_at_epoch_secs: Option, + transport_mode: String, + me: StartupMeSnapshot, + components: Vec, +} + +pub struct StartupTracker { + started_at_instant: Instant, + state: RwLock, +} + +impl StartupTracker { + pub fn new(started_at_epoch_secs: u64) -> Self { + Self { + started_at_instant: Instant::now(), + state: RwLock::new(StartupState { + status: StartupStatus::Initializing, + degraded: false, + current_stage: COMPONENT_CONFIG_LOAD.to_string(), + started_at_epoch_secs, + ready_at_epoch_secs: None, + transport_mode: "unknown".to_string(), + me: StartupMeSnapshot { + status: StartupMeStatus::Pending, + current_stage: "pending".to_string(), + init_attempt: 0, + retry_limit: "unlimited".to_string(), + last_error: None, + }, + components: component_blueprint(), + }), + } + } + + pub async fn set_transport_mode(&self, mode: &'static str) { + self.state.write().await.transport_mode = mode.to_string(); + } + + pub async fn set_degraded(&self, degraded: bool) { + self.state.write().await.degraded = degraded; + } + + pub async fn start_component(&self, id: &'static str, details: Option) { + let mut guard = self.state.write().await; + guard.current_stage = id.to_string(); + if let Some(component) = guard.components.iter_mut().find(|component| component.id == id) { + if component.started_at_epoch_ms.is_none() { + component.started_at_epoch_ms = Some(now_epoch_ms()); + } + component.attempts = component.attempts.saturating_add(1); + component.status = StartupComponentStatus::Running; + component.details = normalize_details(details); + } + } + + pub async fn complete_component(&self, id: &'static str, details: Option) { + self.finish_component(id, StartupComponentStatus::Ready, details) + .await; + } + + pub async fn fail_component(&self, id: &'static str, details: Option) { + self.finish_component(id, StartupComponentStatus::Failed, details) + .await; + } + + pub async fn skip_component(&self, id: &'static str, details: Option) { + self.finish_component(id, StartupComponentStatus::Skipped, details) + .await; + } + + async fn finish_component( + &self, + id: &'static str, + status: StartupComponentStatus, + details: Option, + ) { + let mut guard = self.state.write().await; + let finished_at = now_epoch_ms(); + if let Some(component) = guard.components.iter_mut().find(|component| component.id == id) { + if component.started_at_epoch_ms.is_none() { + component.started_at_epoch_ms = Some(finished_at); + component.attempts = component.attempts.saturating_add(1); + } + component.finished_at_epoch_ms = Some(finished_at); + component.duration_ms = component + .started_at_epoch_ms + .map(|started_at| finished_at.saturating_sub(started_at)); + component.status = status; + component.details = normalize_details(details); + } + } + + pub async fn set_me_status(&self, status: StartupMeStatus, stage: &'static str) { + let mut guard = self.state.write().await; + guard.me.status = status; + guard.me.current_stage = stage.to_string(); + } + + pub async fn set_me_retry_limit(&self, retry_limit: String) { + self.state.write().await.me.retry_limit = retry_limit; + } + + pub async fn set_me_init_attempt(&self, attempt: u32) { + self.state.write().await.me.init_attempt = attempt; + } + + pub async fn set_me_last_error(&self, error: Option) { + self.state.write().await.me.last_error = normalize_details(error); + } + + pub async fn mark_ready(&self) { + let mut guard = self.state.write().await; + if guard.status == StartupStatus::Ready { + return; + } + guard.status = StartupStatus::Ready; + guard.current_stage = "ready".to_string(); + guard.ready_at_epoch_secs = Some(now_epoch_secs()); + } + + pub async fn snapshot(&self) -> StartupSnapshot { + let guard = self.state.read().await; + StartupSnapshot { + status: guard.status, + degraded: guard.degraded, + current_stage: guard.current_stage.clone(), + started_at_epoch_secs: guard.started_at_epoch_secs, + ready_at_epoch_secs: guard.ready_at_epoch_secs, + total_elapsed_ms: self.started_at_instant.elapsed().as_millis() as u64, + transport_mode: guard.transport_mode.clone(), + me: guard.me.clone(), + components: guard + .components + .iter() + .map(|component| StartupComponentSnapshot { + id: component.id, + title: component.title, + weight: component.weight, + status: component.status, + started_at_epoch_ms: component.started_at_epoch_ms, + finished_at_epoch_ms: component.finished_at_epoch_ms, + duration_ms: component.duration_ms, + attempts: component.attempts, + details: component.details.clone(), + }) + .collect(), + } + } +} + +pub fn compute_progress_pct(snapshot: &StartupSnapshot, me_stage_progress: Option) -> f64 { + if snapshot.status == StartupStatus::Ready { + return 100.0; + } + + let mut total_weight = 0.0f64; + let mut completed_weight = 0.0f64; + + for component in &snapshot.components { + total_weight += component.weight; + let unit_progress = match component.status { + StartupComponentStatus::Pending => 0.0, + StartupComponentStatus::Running => { + if component.id == COMPONENT_ME_POOL_INIT_STAGE1 { + me_stage_progress.unwrap_or(0.0).clamp(0.0, 1.0) + } else { + 0.0 + } + } + StartupComponentStatus::Ready + | StartupComponentStatus::Failed + | StartupComponentStatus::Skipped => 1.0, + }; + completed_weight += component.weight * unit_progress; + } + + if total_weight <= f64::EPSILON { + 0.0 + } else { + ((completed_weight / total_weight) * 100.0).clamp(0.0, 100.0) + } +} + +fn component_blueprint() -> Vec { + vec![ + component(COMPONENT_CONFIG_LOAD, "Config load", 5.0), + component(COMPONENT_TRACING_INIT, "Tracing init", 3.0), + component(COMPONENT_API_BOOTSTRAP, "API bootstrap", 5.0), + component(COMPONENT_TLS_FRONT_BOOTSTRAP, "TLS front bootstrap", 5.0), + component(COMPONENT_NETWORK_PROBE, "Network probe", 10.0), + component(COMPONENT_ME_SECRET_FETCH, "ME secret fetch", 8.0), + component(COMPONENT_ME_PROXY_CONFIG_V4, "ME config v4 fetch", 4.0), + component(COMPONENT_ME_PROXY_CONFIG_V6, "ME config v6 fetch", 4.0), + component(COMPONENT_ME_POOL_CONSTRUCT, "ME pool construct", 6.0), + component(COMPONENT_ME_POOL_INIT_STAGE1, "ME pool init stage1", 24.0), + component(COMPONENT_ME_CONNECTIVITY_PING, "ME connectivity ping", 6.0), + component(COMPONENT_DC_CONNECTIVITY_PING, "DC connectivity ping", 8.0), + component(COMPONENT_LISTENERS_BIND, "Listener bind", 8.0), + component(COMPONENT_CONFIG_WATCHER_START, "Config watcher start", 2.0), + component(COMPONENT_METRICS_START, "Metrics start", 1.0), + component(COMPONENT_RUNTIME_READY, "Runtime ready", 1.0), + ] +} + +fn component(id: &'static str, title: &'static str, weight: f64) -> StartupComponent { + StartupComponent { + id, + title, + weight, + status: StartupComponentStatus::Pending, + started_at_epoch_ms: None, + finished_at_epoch_ms: None, + duration_ms: None, + attempts: 0, + details: None, + } +} + +fn normalize_details(details: Option) -> Option { + details.map(|detail| { + if detail.len() <= 256 { + detail + } else { + detail[..256].to_string() + } + }) +} + +fn now_epoch_secs() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() +} + +fn now_epoch_ms() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64 +}