//! telemt — Telegram MTProto Proxy #![allow(unused_assignments)] use std::net::SocketAddr; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use rand::Rng; use tokio::net::TcpListener; use tokio::signal; use tokio::sync::{RwLock, Semaphore, mpsc, watch}; use tracing::{debug, error, info, warn}; use tracing_subscriber::{EnvFilter, fmt, prelude::*, reload}; #[cfg(unix)] use tokio::net::UnixListener; mod cli; mod api; mod config; mod crypto; mod error; mod ip_tracker; mod network; mod metrics; mod protocol; mod proxy; mod stats; mod stream; mod startup; mod transport; mod tls_front; mod util; use crate::config::{LogLevel, ProxyConfig}; use crate::config::hot_reload::spawn_config_watcher; use crate::crypto::SecureRandom; use crate::ip_tracker::UserIpTracker; use crate::network::probe::{decide_network_capabilities, log_probe_result, run_probe}; use crate::proxy::ClientHandler; use crate::stats::beobachten::BeobachtenStore; use crate::stats::telemetry::TelemetryPolicy; use crate::stats::{ReplayChecker, Stats}; use crate::startup::{ COMPONENT_API_BOOTSTRAP, COMPONENT_CONFIG_LOAD, COMPONENT_CONFIG_WATCHER_START, COMPONENT_DC_CONNECTIVITY_PING, COMPONENT_LISTENERS_BIND, COMPONENT_ME_CONNECTIVITY_PING, COMPONENT_ME_POOL_CONSTRUCT, COMPONENT_ME_POOL_INIT_STAGE1, COMPONENT_ME_PROXY_CONFIG_V4, COMPONENT_ME_PROXY_CONFIG_V6, COMPONENT_ME_SECRET_FETCH, COMPONENT_METRICS_START, COMPONENT_NETWORK_PROBE, COMPONENT_RUNTIME_READY, COMPONENT_TLS_FRONT_BOOTSTRAP, COMPONENT_TRACING_INIT, StartupMeStatus, StartupTracker, }; use crate::stream::BufferPool; use crate::transport::middle_proxy::{ MePool, ProxyConfigData, fetch_proxy_config_with_raw, format_me_route, format_sample_line, load_proxy_config_cache, run_me_ping, save_proxy_config_cache, MePingFamily, MePingSample, MeReinitTrigger, }; use crate::transport::{ListenOptions, UpstreamManager, create_listener, find_listener_processes}; use crate::tls_front::TlsFrontCache; fn parse_cli() -> (String, bool, Option) { let mut config_path = "config.toml".to_string(); let mut silent = false; let mut log_level: Option = None; let args: Vec = std::env::args().skip(1).collect(); // Check for --init first (handled before tokio) if let Some(init_opts) = cli::parse_init_args(&args) { if let Err(e) = cli::run_init(init_opts) { eprintln!("[telemt] Init failed: {}", e); std::process::exit(1); } std::process::exit(0); } let mut i = 0; while i < args.len() { match args[i].as_str() { "--silent" | "-s" => { silent = true; } "--log-level" => { i += 1; if i < args.len() { log_level = Some(args[i].clone()); } } s if s.starts_with("--log-level=") => { log_level = Some(s.trim_start_matches("--log-level=").to_string()); } "--help" | "-h" => { eprintln!("Usage: telemt [config.toml] [OPTIONS]"); eprintln!(); eprintln!("Options:"); eprintln!(" --silent, -s Suppress info logs"); eprintln!(" --log-level debug|verbose|normal|silent"); eprintln!(" --help, -h Show this help"); eprintln!(); eprintln!("Setup (fire-and-forget):"); eprintln!( " --init Generate config, install systemd service, start" ); eprintln!(" --port Listen port (default: 443)"); eprintln!( " --domain TLS domain for masking (default: www.google.com)" ); eprintln!( " --secret 32-char hex secret (auto-generated if omitted)" ); eprintln!(" --user Username (default: user)"); eprintln!(" --config-dir Config directory (default: /etc/telemt)"); eprintln!(" --no-start Don't start the service after install"); std::process::exit(0); } "--version" | "-V" => { println!("telemt {}", env!("CARGO_PKG_VERSION")); std::process::exit(0); } s if !s.starts_with('-') => { config_path = s.to_string(); } other => { eprintln!("Unknown option: {}", other); } } i += 1; } (config_path, silent, log_level) } fn print_proxy_links(host: &str, port: u16, config: &ProxyConfig) { info!(target: "telemt::links", "--- Proxy Links ({}) ---", host); for user_name in config.general.links.show.resolve_users(&config.access.users) { if let Some(secret) = config.access.users.get(user_name) { info!(target: "telemt::links", "User: {}", user_name); if config.general.modes.classic { info!( target: "telemt::links", " Classic: tg://proxy?server={}&port={}&secret={}", host, port, secret ); } if config.general.modes.secure { info!( target: "telemt::links", " DD: tg://proxy?server={}&port={}&secret=dd{}", host, port, secret ); } if config.general.modes.tls { let mut domains = Vec::with_capacity(1 + config.censorship.tls_domains.len()); domains.push(config.censorship.tls_domain.clone()); for d in &config.censorship.tls_domains { if !domains.contains(d) { domains.push(d.clone()); } } for domain in domains { let domain_hex = hex::encode(&domain); info!( target: "telemt::links", " EE-TLS: tg://proxy?server={}&port={}&secret=ee{}{}", host, port, secret, domain_hex ); } } } else { warn!(target: "telemt::links", "User '{}' in show_link not found", user_name); } } info!(target: "telemt::links", "------------------------"); } async fn write_beobachten_snapshot(path: &str, payload: &str) -> std::io::Result<()> { if let Some(parent) = std::path::Path::new(path).parent() && !parent.as_os_str().is_empty() { tokio::fs::create_dir_all(parent).await?; } tokio::fs::write(path, payload).await } fn unit_label(value: u64, singular: &'static str, plural: &'static str) -> &'static str { if value == 1 { singular } else { plural } } fn format_uptime(total_secs: u64) -> String { const SECS_PER_MINUTE: u64 = 60; const SECS_PER_HOUR: u64 = 60 * SECS_PER_MINUTE; const SECS_PER_DAY: u64 = 24 * SECS_PER_HOUR; const SECS_PER_MONTH: u64 = 30 * SECS_PER_DAY; const SECS_PER_YEAR: u64 = 12 * SECS_PER_MONTH; let mut remaining = total_secs; let years = remaining / SECS_PER_YEAR; remaining %= SECS_PER_YEAR; let months = remaining / SECS_PER_MONTH; remaining %= SECS_PER_MONTH; let days = remaining / SECS_PER_DAY; remaining %= SECS_PER_DAY; let hours = remaining / SECS_PER_HOUR; remaining %= SECS_PER_HOUR; let minutes = remaining / SECS_PER_MINUTE; let seconds = remaining % SECS_PER_MINUTE; let mut parts = Vec::new(); if total_secs > SECS_PER_YEAR { parts.push(format!( "{} {}", years, unit_label(years, "year", "years") )); } if total_secs > SECS_PER_MONTH { parts.push(format!( "{} {}", months, unit_label(months, "month", "months") )); } if total_secs > SECS_PER_DAY { parts.push(format!( "{} {}", days, unit_label(days, "day", "days") )); } if total_secs > SECS_PER_HOUR { parts.push(format!( "{} {}", hours, unit_label(hours, "hour", "hours") )); } if total_secs > SECS_PER_MINUTE { parts.push(format!( "{} {}", minutes, unit_label(minutes, "minute", "minutes") )); } parts.push(format!( "{} {}", seconds, unit_label(seconds, "second", "seconds") )); format!("{} / {} seconds", parts.join(", "), total_secs) } async fn wait_until_admission_open(admission_rx: &mut watch::Receiver) -> bool { loop { if *admission_rx.borrow() { return true; } if admission_rx.changed().await.is_err() { return *admission_rx.borrow(); } } } async fn load_startup_proxy_config_snapshot( url: &str, cache_path: Option<&str>, me2dc_fallback: bool, label: &'static str, ) -> Option { loop { match fetch_proxy_config_with_raw(url).await { Ok((cfg, raw)) => { if !cfg.map.is_empty() { if let Some(path) = cache_path && let Err(e) = save_proxy_config_cache(path, &raw).await { warn!(error = %e, path, snapshot = label, "Failed to store startup proxy-config cache"); } return Some(cfg); } warn!(snapshot = label, url, "Startup proxy-config is empty; trying disk cache"); if let Some(path) = cache_path { match load_proxy_config_cache(path).await { Ok(cached) if !cached.map.is_empty() => { info!( snapshot = label, path, proxy_for_lines = cached.proxy_for_lines, "Loaded startup proxy-config from disk cache" ); return Some(cached); } Ok(_) => { warn!( snapshot = label, path, "Startup proxy-config cache is empty; ignoring cache file" ); } Err(cache_err) => { debug!( snapshot = label, path, error = %cache_err, "Startup proxy-config cache unavailable" ); } } } if me2dc_fallback { error!( snapshot = label, "Startup proxy-config unavailable and no saved config found; falling back to direct mode" ); return None; } warn!( snapshot = label, retry_in_secs = 2, "Startup proxy-config unavailable and no saved config found; retrying because me2dc_fallback=false" ); tokio::time::sleep(Duration::from_secs(2)).await; } Err(fetch_err) => { if let Some(path) = cache_path { match load_proxy_config_cache(path).await { Ok(cached) if !cached.map.is_empty() => { info!( snapshot = label, path, proxy_for_lines = cached.proxy_for_lines, "Loaded startup proxy-config from disk cache" ); return Some(cached); } Ok(_) => { warn!( snapshot = label, path, "Startup proxy-config cache is empty; ignoring cache file" ); } Err(cache_err) => { debug!( snapshot = label, path, error = %cache_err, "Startup proxy-config cache unavailable" ); } } } if me2dc_fallback { error!( snapshot = label, error = %fetch_err, "Startup proxy-config unavailable and no cached data; falling back to direct mode" ); return None; } warn!( snapshot = label, error = %fetch_err, retry_in_secs = 2, "Startup proxy-config unavailable; retrying because me2dc_fallback=false" ); tokio::time::sleep(Duration::from_secs(2)).await; } } } } #[tokio::main] async fn main() -> std::result::Result<(), Box> { let process_started_at = Instant::now(); let process_started_at_epoch_secs = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_secs(); let startup_tracker = Arc::new(StartupTracker::new(process_started_at_epoch_secs)); startup_tracker .start_component(COMPONENT_CONFIG_LOAD, Some("load and validate config".to_string())) .await; let (config_path, cli_silent, cli_log_level) = parse_cli(); let mut config = match ProxyConfig::load(&config_path) { Ok(c) => c, Err(e) => { if std::path::Path::new(&config_path).exists() { eprintln!("[telemt] Error: {}", e); std::process::exit(1); } else { let default = ProxyConfig::default(); std::fs::write(&config_path, toml::to_string_pretty(&default).unwrap()).unwrap(); eprintln!("[telemt] Created default config at {}", config_path); default } } }; if let Err(e) = config.validate() { eprintln!("[telemt] Invalid config: {}", e); std::process::exit(1); } if let Err(e) = crate::network::dns_overrides::install_entries(&config.network.dns_overrides) { eprintln!("[telemt] Invalid network.dns_overrides: {}", e); std::process::exit(1); } startup_tracker .complete_component(COMPONENT_CONFIG_LOAD, Some("config is ready".to_string())) .await; let has_rust_log = std::env::var("RUST_LOG").is_ok(); let effective_log_level = if cli_silent { LogLevel::Silent } else if let Some(ref s) = cli_log_level { LogLevel::from_str_loose(s) } else { config.general.log_level.clone() }; let (filter_layer, filter_handle) = reload::Layer::new(EnvFilter::new("info")); startup_tracker .start_component(COMPONENT_TRACING_INIT, Some("initialize tracing subscriber".to_string())) .await; // Configure color output based on config let fmt_layer = if config.general.disable_colors { fmt::Layer::default().with_ansi(false) } else { fmt::Layer::default().with_ansi(true) }; tracing_subscriber::registry() .with(filter_layer) .with(fmt_layer) .init(); startup_tracker .complete_component(COMPONENT_TRACING_INIT, Some("tracing initialized".to_string())) .await; info!("Telemt MTProxy v{}", env!("CARGO_PKG_VERSION")); info!("Log level: {}", effective_log_level); if config.general.disable_colors { info!("Colors: disabled"); } info!( "Modes: classic={} secure={} tls={}", config.general.modes.classic, config.general.modes.secure, config.general.modes.tls ); if config.general.modes.classic { warn!("Classic mode is vulnerable to DPI detection; enable only for legacy clients"); } info!("TLS domain: {}", config.censorship.tls_domain); if let Some(ref sock) = config.censorship.mask_unix_sock { info!("Mask: {} -> unix:{}", config.censorship.mask, sock); if !std::path::Path::new(sock).exists() { warn!( "Unix socket '{}' does not exist yet. Masking will fail until it appears.", sock ); } } else { info!( "Mask: {} -> {}:{}", config.censorship.mask, config .censorship .mask_host .as_deref() .unwrap_or(&config.censorship.tls_domain), config.censorship.mask_port ); } if config.censorship.tls_domain == "www.google.com" { warn!("Using default tls_domain. Consider setting a custom domain."); } let stats = Arc::new(Stats::new()); stats.apply_telemetry_policy(TelemetryPolicy::from_config(&config.general.telemetry)); let upstream_manager = Arc::new(UpstreamManager::new( config.upstreams.clone(), config.general.upstream_connect_retry_attempts, config.general.upstream_connect_retry_backoff_ms, config.general.upstream_connect_budget_ms, config.general.upstream_unhealthy_fail_threshold, config.general.upstream_connect_failfast_hard_errors, stats.clone(), )); let ip_tracker = Arc::new(UserIpTracker::new()); ip_tracker.load_limits(&config.access.user_max_unique_ips).await; ip_tracker .set_limit_policy( config.access.user_max_unique_ips_mode, config.access.user_max_unique_ips_window_secs, ) .await; if !config.access.user_max_unique_ips.is_empty() { info!( "IP limits configured for {} users", config.access.user_max_unique_ips.len() ); } if !config.network.dns_overrides.is_empty() { info!( "Runtime DNS overrides configured: {} entries", config.network.dns_overrides.len() ); } let (api_config_tx, api_config_rx) = watch::channel(Arc::new(config.clone())); let initial_admission_open = !config.general.use_middle_proxy; let (admission_tx, admission_rx) = watch::channel(initial_admission_open); let api_me_pool = Arc::new(RwLock::new(None::>)); startup_tracker .start_component(COMPONENT_API_BOOTSTRAP, Some("spawn API listener task".to_string())) .await; if config.server.api.enabled { let listen = match config.server.api.listen.parse::() { Ok(listen) => listen, Err(error) => { warn!( error = %error, listen = %config.server.api.listen, "Invalid server.api.listen; API is disabled" ); SocketAddr::from(([127, 0, 0, 1], 0)) } }; if listen.port() != 0 { let stats_api = stats.clone(); let ip_tracker_api = ip_tracker.clone(); let me_pool_api = api_me_pool.clone(); let upstream_manager_api = upstream_manager.clone(); let config_rx_api = api_config_rx.clone(); let admission_rx_api = admission_rx.clone(); let config_path_api = std::path::PathBuf::from(&config_path); let startup_tracker_api = startup_tracker.clone(); tokio::spawn(async move { api::serve( listen, stats_api, ip_tracker_api, me_pool_api, upstream_manager_api, config_rx_api, admission_rx_api, config_path_api, None, None, process_started_at_epoch_secs, startup_tracker_api, ) .await; }); startup_tracker .complete_component( COMPONENT_API_BOOTSTRAP, Some(format!("api task spawned on {}", listen)), ) .await; } else { startup_tracker .skip_component( COMPONENT_API_BOOTSTRAP, Some("server.api.listen has zero port".to_string()), ) .await; } } else { startup_tracker .skip_component( COMPONENT_API_BOOTSTRAP, Some("server.api.enabled is false".to_string()), ) .await; } let mut tls_domains = Vec::with_capacity(1 + config.censorship.tls_domains.len()); tls_domains.push(config.censorship.tls_domain.clone()); for d in &config.censorship.tls_domains { if !tls_domains.contains(d) { tls_domains.push(d.clone()); } } // Start TLS front fetching in background immediately, in parallel with STUN probing. startup_tracker .start_component( COMPONENT_TLS_FRONT_BOOTSTRAP, Some("initialize TLS front cache/bootstrap tasks".to_string()), ) .await; let tls_cache: Option> = if config.censorship.tls_emulation { let cache = Arc::new(TlsFrontCache::new( &tls_domains, config.censorship.fake_cert_len, &config.censorship.tls_front_dir, )); cache.load_from_disk().await; let port = config.censorship.mask_port; let proxy_protocol = config.censorship.mask_proxy_protocol; let mask_host = config .censorship .mask_host .clone() .unwrap_or_else(|| config.censorship.tls_domain.clone()); let mask_unix_sock = config.censorship.mask_unix_sock.clone(); let fetch_timeout = Duration::from_secs(5); let cache_initial = cache.clone(); let domains_initial = tls_domains.clone(); let host_initial = mask_host.clone(); let unix_sock_initial = mask_unix_sock.clone(); let upstream_initial = upstream_manager.clone(); tokio::spawn(async move { let mut join = tokio::task::JoinSet::new(); for domain in domains_initial { let cache_domain = cache_initial.clone(); let host_domain = host_initial.clone(); let unix_sock_domain = unix_sock_initial.clone(); let upstream_domain = upstream_initial.clone(); join.spawn(async move { match crate::tls_front::fetcher::fetch_real_tls( &host_domain, port, &domain, fetch_timeout, Some(upstream_domain), proxy_protocol, unix_sock_domain.as_deref(), ) .await { Ok(res) => cache_domain.update_from_fetch(&domain, res).await, Err(e) => { warn!(domain = %domain, error = %e, "TLS emulation initial fetch failed") } } }); } while let Some(res) = join.join_next().await { if let Err(e) = res { warn!(error = %e, "TLS emulation initial fetch task join failed"); } } }); let cache_timeout = cache.clone(); let domains_timeout = tls_domains.clone(); let fake_cert_len = config.censorship.fake_cert_len; tokio::spawn(async move { tokio::time::sleep(fetch_timeout).await; for domain in domains_timeout { let cached = cache_timeout.get(&domain).await; if cached.domain == "default" { warn!( domain = %domain, timeout_secs = fetch_timeout.as_secs(), fake_cert_len, "TLS-front fetch not ready within timeout; using cache/default fake cert fallback" ); } } }); // Periodic refresh with jitter. let cache_refresh = cache.clone(); let domains_refresh = tls_domains.clone(); let host_refresh = mask_host.clone(); let unix_sock_refresh = mask_unix_sock.clone(); let upstream_refresh = upstream_manager.clone(); tokio::spawn(async move { loop { let base_secs = rand::rng().random_range(4 * 3600..=6 * 3600); let jitter_secs = rand::rng().random_range(0..=7200); tokio::time::sleep(Duration::from_secs(base_secs + jitter_secs)).await; let mut join = tokio::task::JoinSet::new(); for domain in domains_refresh.clone() { let cache_domain = cache_refresh.clone(); let host_domain = host_refresh.clone(); let unix_sock_domain = unix_sock_refresh.clone(); let upstream_domain = upstream_refresh.clone(); join.spawn(async move { match crate::tls_front::fetcher::fetch_real_tls( &host_domain, port, &domain, fetch_timeout, Some(upstream_domain), proxy_protocol, unix_sock_domain.as_deref(), ) .await { Ok(res) => cache_domain.update_from_fetch(&domain, res).await, Err(e) => warn!(domain = %domain, error = %e, "TLS emulation refresh failed"), } }); } while let Some(res) = join.join_next().await { if let Err(e) = res { warn!(error = %e, "TLS emulation refresh task join failed"); } } } }); Some(cache) } else { startup_tracker .skip_component( COMPONENT_TLS_FRONT_BOOTSTRAP, Some("censorship.tls_emulation is false".to_string()), ) .await; None }; if tls_cache.is_some() { startup_tracker .complete_component( COMPONENT_TLS_FRONT_BOOTSTRAP, Some("tls front cache is initialized".to_string()), ) .await; } startup_tracker .start_component(COMPONENT_NETWORK_PROBE, Some("probe network capabilities".to_string())) .await; let probe = run_probe( &config.network, config.general.middle_proxy_nat_probe, config.general.stun_nat_probe_concurrency, ) .await?; let decision = decide_network_capabilities(&config.network, &probe); log_probe_result(&probe, &decision); startup_tracker .complete_component( COMPONENT_NETWORK_PROBE, Some("network capabilities determined".to_string()), ) .await; let prefer_ipv6 = decision.prefer_ipv6(); let mut use_middle_proxy = config.general.use_middle_proxy; let beobachten = Arc::new(BeobachtenStore::new()); let rng = Arc::new(SecureRandom::new()); // Connection concurrency limit let max_connections = Arc::new(Semaphore::new(10_000)); let me2dc_fallback = config.general.me2dc_fallback; let me_init_retry_attempts = config.general.me_init_retry_attempts; let me_init_warn_after_attempts: u32 = 3; if use_middle_proxy && !decision.ipv4_me && !decision.ipv6_me { if me2dc_fallback { warn!("No usable IP family for Middle Proxy detected; falling back to direct DC"); use_middle_proxy = false; } else { warn!( "No usable IP family for Middle Proxy detected; me2dc_fallback=false, ME init retries stay active" ); } } if use_middle_proxy { startup_tracker .set_me_status(StartupMeStatus::Initializing, COMPONENT_ME_SECRET_FETCH) .await; startup_tracker .start_component( COMPONENT_ME_SECRET_FETCH, Some("fetch proxy-secret from source/cache".to_string()), ) .await; startup_tracker .set_me_retry_limit(if !me2dc_fallback || me_init_retry_attempts == 0 { "unlimited".to_string() } else { me_init_retry_attempts.to_string() }) .await; } else { startup_tracker .set_me_status(StartupMeStatus::Skipped, "skipped") .await; startup_tracker .skip_component( COMPONENT_ME_SECRET_FETCH, Some("middle proxy mode disabled".to_string()), ) .await; startup_tracker .skip_component( COMPONENT_ME_PROXY_CONFIG_V4, Some("middle proxy mode disabled".to_string()), ) .await; startup_tracker .skip_component( COMPONENT_ME_PROXY_CONFIG_V6, Some("middle proxy mode disabled".to_string()), ) .await; startup_tracker .skip_component( COMPONENT_ME_POOL_CONSTRUCT, Some("middle proxy mode disabled".to_string()), ) .await; startup_tracker .skip_component( COMPONENT_ME_POOL_INIT_STAGE1, Some("middle proxy mode disabled".to_string()), ) .await; } // ===================================================================== // Middle Proxy initialization (if enabled) // ===================================================================== let me_pool: Option> = if use_middle_proxy { info!("=== Middle Proxy Mode ==="); let me_nat_probe = config.general.middle_proxy_nat_probe && config.network.stun_use; if config.general.middle_proxy_nat_probe && !config.network.stun_use { info!("Middle-proxy STUN probing disabled by network.stun_use=false"); } // Global ad_tag (pool default). Used when user has no per-user tag in access.user_ad_tags. let proxy_tag = config .general .ad_tag .as_ref() .map(|tag| hex::decode(tag).expect("general.ad_tag must be validated before startup")); // ============================================================= // CRITICAL: Download Telegram proxy-secret (NOT user secret!) // // C MTProxy uses TWO separate secrets: // -S flag = 16-byte user secret for client obfuscation // --aes-pwd = 32-512 byte binary file for ME RPC auth // // proxy-secret is from: https://core.telegram.org/getProxySecret // ============================================================= let proxy_secret_path = config.general.proxy_secret_path.as_deref(); let pool_size = config.general.middle_proxy_pool_size.max(1); let proxy_secret = loop { match crate::transport::middle_proxy::fetch_proxy_secret( proxy_secret_path, config.general.proxy_secret_len_max, ) .await { Ok(proxy_secret) => break Some(proxy_secret), Err(e) => { startup_tracker .set_me_last_error(Some(e.to_string())) .await; if me2dc_fallback { error!( error = %e, "ME startup failed: proxy-secret is unavailable and no saved secret found; falling back to direct mode" ); break None; } warn!( error = %e, retry_in_secs = 2, "ME startup failed: proxy-secret is unavailable and no saved secret found; retrying because me2dc_fallback=false" ); tokio::time::sleep(Duration::from_secs(2)).await; } } }; match proxy_secret { Some(proxy_secret) => { startup_tracker .complete_component( COMPONENT_ME_SECRET_FETCH, Some("proxy-secret loaded".to_string()), ) .await; info!( secret_len = proxy_secret.len(), key_sig = format_args!( "0x{:08x}", if proxy_secret.len() >= 4 { u32::from_le_bytes([ proxy_secret[0], proxy_secret[1], proxy_secret[2], proxy_secret[3], ]) } else { 0 } ), "Proxy-secret loaded" ); startup_tracker .start_component( COMPONENT_ME_PROXY_CONFIG_V4, Some("load startup proxy-config v4".to_string()), ) .await; startup_tracker .set_me_status(StartupMeStatus::Initializing, COMPONENT_ME_PROXY_CONFIG_V4) .await; let cfg_v4 = load_startup_proxy_config_snapshot( "https://core.telegram.org/getProxyConfig", config.general.proxy_config_v4_cache_path.as_deref(), me2dc_fallback, "getProxyConfig", ) .await; if cfg_v4.is_some() { startup_tracker .complete_component( COMPONENT_ME_PROXY_CONFIG_V4, Some("proxy-config v4 loaded".to_string()), ) .await; } else { startup_tracker .fail_component( COMPONENT_ME_PROXY_CONFIG_V4, Some("proxy-config v4 unavailable".to_string()), ) .await; } startup_tracker .start_component( COMPONENT_ME_PROXY_CONFIG_V6, Some("load startup proxy-config v6".to_string()), ) .await; startup_tracker .set_me_status(StartupMeStatus::Initializing, COMPONENT_ME_PROXY_CONFIG_V6) .await; let cfg_v6 = load_startup_proxy_config_snapshot( "https://core.telegram.org/getProxyConfigV6", config.general.proxy_config_v6_cache_path.as_deref(), me2dc_fallback, "getProxyConfigV6", ) .await; if cfg_v6.is_some() { startup_tracker .complete_component( COMPONENT_ME_PROXY_CONFIG_V6, Some("proxy-config v6 loaded".to_string()), ) .await; } else { startup_tracker .fail_component( COMPONENT_ME_PROXY_CONFIG_V6, Some("proxy-config v6 unavailable".to_string()), ) .await; } if let (Some(cfg_v4), Some(cfg_v6)) = (cfg_v4, cfg_v6) { startup_tracker .start_component( COMPONENT_ME_POOL_CONSTRUCT, Some("construct ME pool".to_string()), ) .await; startup_tracker .set_me_status(StartupMeStatus::Initializing, COMPONENT_ME_POOL_CONSTRUCT) .await; let pool = MePool::new( proxy_tag.clone(), proxy_secret, config.general.middle_proxy_nat_ip, me_nat_probe, None, config.network.stun_servers.clone(), config.general.stun_nat_probe_concurrency, probe.detected_ipv6, config.timeouts.me_one_retry, config.timeouts.me_one_timeout_ms, cfg_v4.map.clone(), cfg_v6.map.clone(), cfg_v4.default_dc.or(cfg_v6.default_dc), decision.clone(), Some(upstream_manager.clone()), rng.clone(), stats.clone(), config.general.me_keepalive_enabled, config.general.me_keepalive_interval_secs, config.general.me_keepalive_jitter_secs, config.general.me_keepalive_payload_random, config.general.rpc_proxy_req_every, config.general.me_warmup_stagger_enabled, config.general.me_warmup_step_delay_ms, config.general.me_warmup_step_jitter_ms, config.general.me_reconnect_max_concurrent_per_dc, config.general.me_reconnect_backoff_base_ms, config.general.me_reconnect_backoff_cap_ms, config.general.me_reconnect_fast_retry_count, config.general.me_single_endpoint_shadow_writers, config.general.me_single_endpoint_outage_mode_enabled, config.general.me_single_endpoint_outage_disable_quarantine, config.general.me_single_endpoint_outage_backoff_min_ms, config.general.me_single_endpoint_outage_backoff_max_ms, config.general.me_single_endpoint_shadow_rotate_every_secs, config.general.me_floor_mode, config.general.me_adaptive_floor_idle_secs, config.general.me_adaptive_floor_min_writers_single_endpoint, config.general.me_adaptive_floor_min_writers_multi_endpoint, config.general.me_adaptive_floor_recover_grace_secs, config.general.me_adaptive_floor_writers_per_core_total, config.general.me_adaptive_floor_cpu_cores_override, config.general.me_adaptive_floor_max_extra_writers_single_per_core, config.general.me_adaptive_floor_max_extra_writers_multi_per_core, config.general.me_adaptive_floor_max_active_writers_per_core, config.general.me_adaptive_floor_max_warm_writers_per_core, config.general.me_adaptive_floor_max_active_writers_global, config.general.me_adaptive_floor_max_warm_writers_global, config.general.hardswap, config.general.me_pool_drain_ttl_secs, config.general.effective_me_pool_force_close_secs(), config.general.me_pool_min_fresh_ratio, config.general.me_hardswap_warmup_delay_min_ms, config.general.me_hardswap_warmup_delay_max_ms, config.general.me_hardswap_warmup_extra_passes, config.general.me_hardswap_warmup_pass_backoff_base_ms, config.general.me_bind_stale_mode, config.general.me_bind_stale_ttl_secs, config.general.me_secret_atomic_snapshot, config.general.me_deterministic_writer_sort, config.general.me_writer_pick_mode, config.general.me_writer_pick_sample_size, config.general.me_socks_kdf_policy, config.general.me_writer_cmd_channel_capacity, config.general.me_route_channel_capacity, config.general.me_route_backpressure_base_timeout_ms, config.general.me_route_backpressure_high_timeout_ms, config.general.me_route_backpressure_high_watermark_pct, config.general.me_health_interval_ms_unhealthy, config.general.me_health_interval_ms_healthy, config.general.me_warn_rate_limit_ms, config.general.me_route_no_writer_mode, config.general.me_route_no_writer_wait_ms, config.general.me_route_inline_recovery_attempts, config.general.me_route_inline_recovery_wait_ms, ); startup_tracker .complete_component( COMPONENT_ME_POOL_CONSTRUCT, Some("ME pool object created".to_string()), ) .await; *api_me_pool.write().await = Some(pool.clone()); startup_tracker .start_component( COMPONENT_ME_POOL_INIT_STAGE1, Some("initialize ME pool writers".to_string()), ) .await; startup_tracker .set_me_status( StartupMeStatus::Initializing, COMPONENT_ME_POOL_INIT_STAGE1, ) .await; let mut init_attempt: u32 = 0; loop { init_attempt = init_attempt.saturating_add(1); startup_tracker.set_me_init_attempt(init_attempt).await; match pool.init(pool_size, &rng).await { Ok(()) => { startup_tracker .set_me_last_error(None) .await; startup_tracker .complete_component( COMPONENT_ME_POOL_INIT_STAGE1, Some("ME pool initialized".to_string()), ) .await; startup_tracker .set_me_status(StartupMeStatus::Ready, "ready") .await; info!( attempt = init_attempt, "Middle-End pool initialized successfully" ); // Phase 4: Start health monitor let pool_clone = pool.clone(); let rng_clone = rng.clone(); let min_conns = pool_size; tokio::spawn(async move { crate::transport::middle_proxy::me_health_monitor( pool_clone, rng_clone, min_conns, ) .await; }); break Some(pool); } Err(e) => { startup_tracker .set_me_last_error(Some(e.to_string())) .await; let retries_limited = me2dc_fallback && me_init_retry_attempts > 0; if retries_limited && init_attempt >= me_init_retry_attempts { startup_tracker .fail_component( COMPONENT_ME_POOL_INIT_STAGE1, Some("ME init retry budget exhausted".to_string()), ) .await; startup_tracker .set_me_status(StartupMeStatus::Failed, "failed") .await; error!( error = %e, attempt = init_attempt, retry_limit = me_init_retry_attempts, "ME pool init retries exhausted; falling back to direct mode" ); break None; } let retry_limit = if !me2dc_fallback || me_init_retry_attempts == 0 { String::from("unlimited") } else { me_init_retry_attempts.to_string() }; if init_attempt >= me_init_warn_after_attempts { warn!( error = %e, attempt = init_attempt, retry_limit = retry_limit, me2dc_fallback = me2dc_fallback, retry_in_secs = 2, "ME pool is not ready yet; retrying startup initialization" ); } else { info!( error = %e, attempt = init_attempt, retry_limit = retry_limit, me2dc_fallback = me2dc_fallback, retry_in_secs = 2, "ME pool startup warmup: retrying initialization" ); } pool.reset_stun_state(); tokio::time::sleep(Duration::from_secs(2)).await; } } } } else { startup_tracker .skip_component( COMPONENT_ME_POOL_CONSTRUCT, Some("ME configs are incomplete".to_string()), ) .await; startup_tracker .fail_component( COMPONENT_ME_POOL_INIT_STAGE1, Some("ME configs are incomplete".to_string()), ) .await; startup_tracker .set_me_status(StartupMeStatus::Failed, "failed") .await; None } } None => { startup_tracker .fail_component( COMPONENT_ME_SECRET_FETCH, Some("proxy-secret unavailable".to_string()), ) .await; startup_tracker .skip_component( COMPONENT_ME_PROXY_CONFIG_V4, Some("proxy-secret unavailable".to_string()), ) .await; startup_tracker .skip_component( COMPONENT_ME_PROXY_CONFIG_V6, Some("proxy-secret unavailable".to_string()), ) .await; startup_tracker .skip_component( COMPONENT_ME_POOL_CONSTRUCT, Some("proxy-secret unavailable".to_string()), ) .await; startup_tracker .fail_component( COMPONENT_ME_POOL_INIT_STAGE1, Some("proxy-secret unavailable".to_string()), ) .await; startup_tracker .set_me_status(StartupMeStatus::Failed, "failed") .await; None } } } else { None }; // If ME failed to initialize, force direct-only mode. if me_pool.is_some() { startup_tracker .set_transport_mode("middle_proxy") .await; startup_tracker .set_degraded(false) .await; info!("Transport: Middle-End Proxy - all DC-over-RPC"); } else { let _ = use_middle_proxy; use_middle_proxy = false; // Make runtime config reflect direct-only mode for handlers. config.general.use_middle_proxy = false; startup_tracker .set_transport_mode("direct") .await; startup_tracker .set_degraded(true) .await; if me2dc_fallback { startup_tracker .set_me_status(StartupMeStatus::Failed, "fallback_to_direct") .await; } else { startup_tracker .set_me_status(StartupMeStatus::Skipped, "skipped") .await; } info!("Transport: Direct DC - TCP - standard DC-over-TCP"); } // Freeze config after possible fallback decision let config = Arc::new(config); let replay_checker = Arc::new(ReplayChecker::new( config.access.replay_check_len, Duration::from_secs(config.access.replay_window_secs), )); let buffer_pool = Arc::new(BufferPool::with_config(16 * 1024, 4096)); // Middle-End ping before DC connectivity if me_pool.is_some() { startup_tracker .start_component( COMPONENT_ME_CONNECTIVITY_PING, Some("run startup ME connectivity check".to_string()), ) .await; } else { startup_tracker .skip_component( COMPONENT_ME_CONNECTIVITY_PING, Some("ME pool is not available".to_string()), ) .await; } if let Some(ref pool) = me_pool { let me_results = run_me_ping(pool, &rng).await; let v4_ok = me_results.iter().any(|r| { matches!(r.family, MePingFamily::V4) && r.samples.iter().any(|s| s.error.is_none() && s.handshake_ms.is_some()) }); let v6_ok = me_results.iter().any(|r| { matches!(r.family, MePingFamily::V6) && r.samples.iter().any(|s| s.error.is_none() && s.handshake_ms.is_some()) }); info!("================= Telegram ME Connectivity ================="); if v4_ok && v6_ok { info!(" IPv4 and IPv6 available"); } else if v4_ok { info!(" IPv4 only / IPv6 unavailable"); } else if v6_ok { info!(" IPv6 only / IPv4 unavailable"); } else { info!(" No ME connectivity"); } let me_route = format_me_route( &config.upstreams, &me_results, prefer_ipv6, v4_ok, v6_ok, ) .await; info!(" via {}", me_route); info!("============================================================"); use std::collections::BTreeMap; let mut grouped: BTreeMap> = BTreeMap::new(); for report in me_results { for s in report.samples { grouped.entry(s.dc).or_default().push(s); } } let family_order = if prefer_ipv6 { vec![MePingFamily::V6, MePingFamily::V4] } else { vec![MePingFamily::V4, MePingFamily::V6] }; for (dc, samples) in grouped { for family in &family_order { let fam_samples: Vec<&MePingSample> = samples .iter() .filter(|s| matches!(s.family, f if &f == family)) .collect(); if fam_samples.is_empty() { continue; } let fam_label = match family { MePingFamily::V4 => "IPv4", MePingFamily::V6 => "IPv6", }; info!(" DC{} [{}]", dc, fam_label); for sample in fam_samples { let line = format_sample_line(sample); info!("{}", line); } } } info!("============================================================"); startup_tracker .complete_component( COMPONENT_ME_CONNECTIVITY_PING, Some("startup ME connectivity check completed".to_string()), ) .await; } info!("================= Telegram DC Connectivity ================="); startup_tracker .start_component( COMPONENT_DC_CONNECTIVITY_PING, Some("run startup DC connectivity check".to_string()), ) .await; let ping_results = upstream_manager .ping_all_dcs( prefer_ipv6, &config.dc_overrides, decision.ipv4_dc, decision.ipv6_dc, ) .await; for upstream_result in &ping_results { let v6_works = upstream_result .v6_results .iter() .any(|r| r.rtt_ms.is_some()); let v4_works = upstream_result .v4_results .iter() .any(|r| r.rtt_ms.is_some()); if upstream_result.both_available { if prefer_ipv6 { info!(" IPv6 in use / IPv4 is fallback"); } else { info!(" IPv4 in use / IPv6 is fallback"); } } else if v6_works && !v4_works { info!(" IPv6 only / IPv4 unavailable"); } else if v4_works && !v6_works { info!(" IPv4 only / IPv6 unavailable"); } else if !v6_works && !v4_works { info!(" No DC connectivity"); } info!(" via {}", upstream_result.upstream_name); info!("============================================================"); // Print IPv6 results first (only if IPv6 is available) if v6_works { for dc in &upstream_result.v6_results { let addr_str = format!("{}:{}", dc.dc_addr.ip(), dc.dc_addr.port()); match &dc.rtt_ms { Some(rtt) => { info!(" DC{} [IPv6] {} - {:.0} ms", dc.dc_idx, addr_str, rtt); } None => { let err = dc.error.as_deref().unwrap_or("fail"); info!(" DC{} [IPv6] {} - FAIL ({})", dc.dc_idx, addr_str, err); } } } info!("============================================================"); } // Print IPv4 results (only if IPv4 is available) if v4_works { for dc in &upstream_result.v4_results { let addr_str = format!("{}:{}", dc.dc_addr.ip(), dc.dc_addr.port()); match &dc.rtt_ms { Some(rtt) => { info!( " DC{} [IPv4] {}\t\t\t\t{:.0} ms", dc.dc_idx, addr_str, rtt ); } None => { let err = dc.error.as_deref().unwrap_or("fail"); info!( " DC{} [IPv4] {}:\t\t\t\tFAIL ({})", dc.dc_idx, addr_str, err ); } } } info!("============================================================"); } } startup_tracker .complete_component( COMPONENT_DC_CONNECTIVITY_PING, Some("startup DC connectivity check completed".to_string()), ) .await; let initialized_secs = process_started_at.elapsed().as_secs(); let second_suffix = if initialized_secs == 1 { "" } else { "s" }; startup_tracker .start_component( COMPONENT_RUNTIME_READY, Some("finalize startup runtime state".to_string()), ) .await; info!("===================== Telegram Startup ====================="); info!( " DC/ME Initialized in {} second{}", initialized_secs, second_suffix ); info!("============================================================"); if let Some(ref pool) = me_pool { pool.set_runtime_ready(true); } *api_me_pool.write().await = me_pool.clone(); // Background tasks let um_clone = upstream_manager.clone(); let decision_clone = decision.clone(); let dc_overrides_for_health = config.dc_overrides.clone(); tokio::spawn(async move { um_clone .run_health_checks( prefer_ipv6, decision_clone.ipv4_dc, decision_clone.ipv6_dc, dc_overrides_for_health, ) .await; }); let rc_clone = replay_checker.clone(); tokio::spawn(async move { rc_clone.run_periodic_cleanup().await; }); let detected_ip_v4: Option = probe.detected_ipv4.map(std::net::IpAddr::V4); let detected_ip_v6: Option = probe.detected_ipv6.map(std::net::IpAddr::V6); debug!( "Detected IPs: v4={:?} v6={:?}", detected_ip_v4, detected_ip_v6 ); // ── Hot-reload watcher ──────────────────────────────────────────────── // Uses inotify to detect file changes instantly (SIGHUP also works). // detected_ip_v4/v6 are passed so newly added users get correct TG links. startup_tracker .start_component( COMPONENT_CONFIG_WATCHER_START, Some("spawn config hot-reload watcher".to_string()), ) .await; let (config_rx, mut log_level_rx): ( tokio::sync::watch::Receiver>, tokio::sync::watch::Receiver, ) = spawn_config_watcher( std::path::PathBuf::from(&config_path), config.clone(), detected_ip_v4, detected_ip_v6, ); startup_tracker .complete_component( COMPONENT_CONFIG_WATCHER_START, Some("config hot-reload watcher started".to_string()), ) .await; let mut config_rx_api_bridge = config_rx.clone(); let api_config_tx_bridge = api_config_tx.clone(); tokio::spawn(async move { loop { if config_rx_api_bridge.changed().await.is_err() { break; } let cfg = config_rx_api_bridge.borrow_and_update().clone(); api_config_tx_bridge.send_replace(cfg); } }); let stats_policy = stats.clone(); let mut config_rx_policy = config_rx.clone(); let me_pool_policy = me_pool.clone(); tokio::spawn(async move { loop { if config_rx_policy.changed().await.is_err() { break; } let cfg = config_rx_policy.borrow_and_update().clone(); stats_policy.apply_telemetry_policy(TelemetryPolicy::from_config(&cfg.general.telemetry)); if let Some(pool) = &me_pool_policy { pool.update_runtime_transport_policy( cfg.general.me_socks_kdf_policy, cfg.general.me_route_backpressure_base_timeout_ms, cfg.general.me_route_backpressure_high_timeout_ms, cfg.general.me_route_backpressure_high_watermark_pct, ); } } }); let ip_tracker_policy = ip_tracker.clone(); let mut config_rx_ip_limits = config_rx.clone(); tokio::spawn(async move { let mut prev_limits = config_rx_ip_limits .borrow() .access .user_max_unique_ips .clone(); let mut prev_mode = config_rx_ip_limits .borrow() .access .user_max_unique_ips_mode; let mut prev_window = config_rx_ip_limits .borrow() .access .user_max_unique_ips_window_secs; loop { if config_rx_ip_limits.changed().await.is_err() { break; } let cfg = config_rx_ip_limits.borrow_and_update().clone(); if prev_limits != cfg.access.user_max_unique_ips { ip_tracker_policy .load_limits(&cfg.access.user_max_unique_ips) .await; prev_limits = cfg.access.user_max_unique_ips.clone(); } if prev_mode != cfg.access.user_max_unique_ips_mode || prev_window != cfg.access.user_max_unique_ips_window_secs { ip_tracker_policy .set_limit_policy( cfg.access.user_max_unique_ips_mode, cfg.access.user_max_unique_ips_window_secs, ) .await; prev_mode = cfg.access.user_max_unique_ips_mode; prev_window = cfg.access.user_max_unique_ips_window_secs; } } }); let beobachten_writer = beobachten.clone(); let config_rx_beobachten = config_rx.clone(); tokio::spawn(async move { loop { let cfg = config_rx_beobachten.borrow().clone(); let sleep_secs = cfg.general.beobachten_flush_secs.max(1); if cfg.general.beobachten { let ttl = Duration::from_secs(cfg.general.beobachten_minutes.saturating_mul(60)); let path = cfg.general.beobachten_file.clone(); let snapshot = beobachten_writer.snapshot_text(ttl); if let Err(e) = write_beobachten_snapshot(&path, &snapshot).await { warn!(error = %e, path = %path, "Failed to flush beobachten snapshot"); } } tokio::time::sleep(Duration::from_secs(sleep_secs)).await; } }); if let Some(ref pool) = me_pool { let reinit_trigger_capacity = config .general .me_reinit_trigger_channel .max(1); let (reinit_tx, reinit_rx) = mpsc::channel::(reinit_trigger_capacity); let pool_clone_sched = pool.clone(); let rng_clone_sched = rng.clone(); let config_rx_clone_sched = config_rx.clone(); tokio::spawn(async move { crate::transport::middle_proxy::me_reinit_scheduler( pool_clone_sched, rng_clone_sched, config_rx_clone_sched, reinit_rx, ) .await; }); let pool_clone = pool.clone(); let config_rx_clone = config_rx.clone(); let reinit_tx_updater = reinit_tx.clone(); tokio::spawn(async move { crate::transport::middle_proxy::me_config_updater( pool_clone, config_rx_clone, reinit_tx_updater, ) .await; }); let config_rx_clone_rot = config_rx.clone(); let reinit_tx_rotation = reinit_tx.clone(); tokio::spawn(async move { crate::transport::middle_proxy::me_rotation_task( config_rx_clone_rot, reinit_tx_rotation, ) .await; }); } startup_tracker .start_component( COMPONENT_LISTENERS_BIND, Some("bind TCP/Unix listeners".to_string()), ) .await; let mut listeners = Vec::new(); for listener_conf in &config.server.listeners { let addr = SocketAddr::new(listener_conf.ip, config.server.port); if addr.is_ipv4() && !decision.ipv4_dc { warn!(%addr, "Skipping IPv4 listener: IPv4 disabled by [network]"); continue; } if addr.is_ipv6() && !decision.ipv6_dc { warn!(%addr, "Skipping IPv6 listener: IPv6 disabled by [network]"); continue; } let options = ListenOptions { reuse_port: listener_conf.reuse_allow, ipv6_only: listener_conf.ip.is_ipv6(), ..Default::default() }; match create_listener(addr, &options) { Ok(socket) => { let listener = TcpListener::from_std(socket.into())?; info!("Listening on {}", addr); let listener_proxy_protocol = listener_conf.proxy_protocol.unwrap_or(config.server.proxy_protocol); // Resolve the public host for link generation let public_host = if let Some(ref announce) = listener_conf.announce { announce.clone() // Use announce (IP or hostname) if explicitly set } else if listener_conf.ip.is_unspecified() { // Auto-detect for unspecified addresses if listener_conf.ip.is_ipv4() { detected_ip_v4 .map(|ip| ip.to_string()) .unwrap_or_else(|| listener_conf.ip.to_string()) } else { detected_ip_v6 .map(|ip| ip.to_string()) .unwrap_or_else(|| listener_conf.ip.to_string()) } } else { listener_conf.ip.to_string() }; // Show per-listener proxy links only when public_host is not set if config.general.links.public_host.is_none() && !config.general.links.show.is_empty() { let link_port = config.general.links.public_port.unwrap_or(config.server.port); print_proxy_links(&public_host, link_port, &config); } listeners.push((listener, listener_proxy_protocol)); } Err(e) => { if e.kind() == std::io::ErrorKind::AddrInUse { let owners = find_listener_processes(addr); if owners.is_empty() { error!( %addr, "Failed to bind: address already in use (owner process unresolved)" ); } else { for owner in owners { error!( %addr, pid = owner.pid, process = %owner.process, "Failed to bind: address already in use" ); } } if !listener_conf.reuse_allow { error!( %addr, "reuse_allow=false; set [[server.listeners]].reuse_allow=true to allow multi-instance listening" ); } } else { error!("Failed to bind to {}: {}", addr, e); } } } } // Show proxy links once when public_host is set, OR when there are no TCP listeners // (unix-only mode) — use detected IP as fallback if !config.general.links.show.is_empty() && (config.general.links.public_host.is_some() || listeners.is_empty()) { let (host, port) = if let Some(ref h) = config.general.links.public_host { (h.clone(), config.general.links.public_port.unwrap_or(config.server.port)) } else { let ip = detected_ip_v4 .or(detected_ip_v6) .map(|ip| ip.to_string()); if ip.is_none() { warn!("show_link is configured but public IP could not be detected. Set public_host in config."); } (ip.unwrap_or_else(|| "UNKNOWN".to_string()), config.general.links.public_port.unwrap_or(config.server.port)) }; print_proxy_links(&host, port, &config); } if config.general.use_middle_proxy { if let Some(pool) = me_pool.as_ref() { let initial_open = pool.admission_ready_conditional_cast().await; admission_tx.send_replace(initial_open); if initial_open { info!("Conditional-admission gate: open (ME pool ready)"); } else { warn!("Conditional-admission gate: closed (ME pool is not ready)"); } let pool_for_gate = pool.clone(); let admission_tx_gate = admission_tx.clone(); let mut config_rx_gate = config_rx.clone(); let mut admission_poll_ms = config.general.me_admission_poll_ms.max(1); tokio::spawn(async move { let mut gate_open = initial_open; let mut open_streak = if initial_open { 1u32 } else { 0u32 }; let mut close_streak = if initial_open { 0u32 } else { 1u32 }; loop { tokio::select! { changed = config_rx_gate.changed() => { if changed.is_err() { break; } let cfg = config_rx_gate.borrow_and_update().clone(); admission_poll_ms = cfg.general.me_admission_poll_ms.max(1); continue; } _ = tokio::time::sleep(Duration::from_millis(admission_poll_ms)) => {} } let ready = pool_for_gate.admission_ready_conditional_cast().await; if ready { open_streak = open_streak.saturating_add(1); close_streak = 0; if !gate_open && open_streak >= 2 { gate_open = true; admission_tx_gate.send_replace(true); info!( open_streak, "Conditional-admission gate opened (ME pool recovered)" ); } } else { close_streak = close_streak.saturating_add(1); open_streak = 0; if gate_open && close_streak >= 2 { gate_open = false; admission_tx_gate.send_replace(false); warn!( close_streak, "Conditional-admission gate closed (ME pool has uncovered DC groups)" ); } } } }); } else { admission_tx.send_replace(false); warn!("Conditional-admission gate: closed (ME pool is unavailable)"); } } else { admission_tx.send_replace(true); } let _admission_tx_hold = admission_tx; // Unix socket setup (before listeners check so unix-only config works) let mut has_unix_listener = false; #[cfg(unix)] if let Some(ref unix_path) = config.server.listen_unix_sock { // Remove stale socket file if present (standard practice) let _ = tokio::fs::remove_file(unix_path).await; let unix_listener = UnixListener::bind(unix_path)?; // Apply socket permissions if configured if let Some(ref perm_str) = config.server.listen_unix_sock_perm { match u32::from_str_radix(perm_str.trim_start_matches('0'), 8) { Ok(mode) => { use std::os::unix::fs::PermissionsExt; let perms = std::fs::Permissions::from_mode(mode); if let Err(e) = std::fs::set_permissions(unix_path, perms) { error!("Failed to set unix socket permissions to {}: {}", perm_str, e); } else { info!("Listening on unix:{} (mode {})", unix_path, perm_str); } } Err(e) => { warn!("Invalid listen_unix_sock_perm '{}': {}. Ignoring.", perm_str, e); info!("Listening on unix:{}", unix_path); } } } else { info!("Listening on unix:{}", unix_path); } has_unix_listener = true; let mut config_rx_unix: tokio::sync::watch::Receiver> = config_rx.clone(); let mut admission_rx_unix = admission_rx.clone(); let stats = stats.clone(); let upstream_manager = upstream_manager.clone(); let replay_checker = replay_checker.clone(); let buffer_pool = buffer_pool.clone(); let rng = rng.clone(); let me_pool = me_pool.clone(); let tls_cache = tls_cache.clone(); let ip_tracker = ip_tracker.clone(); let beobachten = beobachten.clone(); let max_connections_unix = max_connections.clone(); tokio::spawn(async move { let unix_conn_counter = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(1)); loop { if !wait_until_admission_open(&mut admission_rx_unix).await { warn!("Conditional-admission gate channel closed for unix listener"); break; } match unix_listener.accept().await { Ok((stream, _)) => { let permit = match max_connections_unix.clone().acquire_owned().await { Ok(permit) => permit, Err(_) => { error!("Connection limiter is closed"); break; } }; let conn_id = unix_conn_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let fake_peer = SocketAddr::from(([127, 0, 0, 1], (conn_id % 65535) as u16)); let config = config_rx_unix.borrow_and_update().clone(); let stats = stats.clone(); let upstream_manager = upstream_manager.clone(); let replay_checker = replay_checker.clone(); let buffer_pool = buffer_pool.clone(); let rng = rng.clone(); let me_pool = me_pool.clone(); let tls_cache = tls_cache.clone(); let ip_tracker = ip_tracker.clone(); let beobachten = beobachten.clone(); let proxy_protocol_enabled = config.server.proxy_protocol; tokio::spawn(async move { let _permit = permit; if let Err(e) = crate::proxy::client::handle_client_stream( stream, fake_peer, config, stats, upstream_manager, replay_checker, buffer_pool, rng, me_pool, tls_cache, ip_tracker, beobachten, proxy_protocol_enabled, ).await { debug!(error = %e, "Unix socket connection error"); } }); } Err(e) => { error!("Unix socket accept error: {}", e); tokio::time::sleep(Duration::from_millis(100)).await; } } } }); } startup_tracker .complete_component( COMPONENT_LISTENERS_BIND, Some(format!( "listeners configured tcp={} unix={}", listeners.len(), has_unix_listener )), ) .await; if listeners.is_empty() && !has_unix_listener { error!("No listeners. Exiting."); std::process::exit(1); } // Switch to user-configured log level after startup let runtime_filter = if has_rust_log { EnvFilter::from_default_env() } else if matches!(effective_log_level, LogLevel::Silent) { EnvFilter::new("warn,telemt::links=info") } else { EnvFilter::new(effective_log_level.to_filter_str()) }; filter_handle .reload(runtime_filter) .expect("Failed to switch log filter"); // Apply log_level changes from hot-reload to the tracing filter. tokio::spawn(async move { loop { if log_level_rx.changed().await.is_err() { break; } let level = log_level_rx.borrow_and_update().clone(); let new_filter = tracing_subscriber::EnvFilter::new(level.to_filter_str()); if let Err(e) = filter_handle.reload(new_filter) { tracing::error!("config reload: failed to update log filter: {}", e); } } }); if let Some(port) = config.server.metrics_port { startup_tracker .start_component( COMPONENT_METRICS_START, Some(format!("spawn metrics endpoint on {}", port)), ) .await; let stats = stats.clone(); let beobachten = beobachten.clone(); let config_rx_metrics = config_rx.clone(); let ip_tracker_metrics = ip_tracker.clone(); let whitelist = config.server.metrics_whitelist.clone(); tokio::spawn(async move { metrics::serve( port, stats, beobachten, ip_tracker_metrics, config_rx_metrics, whitelist, ) .await; }); startup_tracker .complete_component( COMPONENT_METRICS_START, Some("metrics task spawned".to_string()), ) .await; } else { startup_tracker .skip_component( COMPONENT_METRICS_START, Some("server.metrics_port is not configured".to_string()), ) .await; } startup_tracker .complete_component( COMPONENT_RUNTIME_READY, Some("startup pipeline is fully initialized".to_string()), ) .await; startup_tracker.mark_ready().await; for (listener, listener_proxy_protocol) in listeners { let mut config_rx: tokio::sync::watch::Receiver> = config_rx.clone(); let mut admission_rx_tcp = admission_rx.clone(); let stats = stats.clone(); let upstream_manager = upstream_manager.clone(); let replay_checker = replay_checker.clone(); let buffer_pool = buffer_pool.clone(); let rng = rng.clone(); let me_pool = me_pool.clone(); let tls_cache = tls_cache.clone(); let ip_tracker = ip_tracker.clone(); let beobachten = beobachten.clone(); let max_connections_tcp = max_connections.clone(); tokio::spawn(async move { loop { if !wait_until_admission_open(&mut admission_rx_tcp).await { warn!("Conditional-admission gate channel closed for tcp listener"); break; } match listener.accept().await { Ok((stream, peer_addr)) => { let permit = match max_connections_tcp.clone().acquire_owned().await { Ok(permit) => permit, Err(_) => { error!("Connection limiter is closed"); break; } }; let config = config_rx.borrow_and_update().clone(); let stats = stats.clone(); let upstream_manager = upstream_manager.clone(); let replay_checker = replay_checker.clone(); let buffer_pool = buffer_pool.clone(); let rng = rng.clone(); let me_pool = me_pool.clone(); let tls_cache = tls_cache.clone(); let ip_tracker = ip_tracker.clone(); let beobachten = beobachten.clone(); let proxy_protocol_enabled = listener_proxy_protocol; tokio::spawn(async move { let _permit = permit; if let Err(e) = ClientHandler::new( stream, peer_addr, config, stats, upstream_manager, replay_checker, buffer_pool, rng, me_pool, tls_cache, ip_tracker, beobachten, proxy_protocol_enabled, ) .run() .await { let peer_closed = matches!( &e, crate::error::ProxyError::Io(ioe) if matches!( ioe.kind(), std::io::ErrorKind::ConnectionReset | std::io::ErrorKind::ConnectionAborted | std::io::ErrorKind::BrokenPipe | std::io::ErrorKind::NotConnected ) ) || matches!( &e, crate::error::ProxyError::Stream( crate::error::StreamError::Io(ioe) ) if matches!( ioe.kind(), std::io::ErrorKind::ConnectionReset | std::io::ErrorKind::ConnectionAborted | std::io::ErrorKind::BrokenPipe | std::io::ErrorKind::NotConnected ) ); let me_closed = matches!( &e, crate::error::ProxyError::Proxy(msg) if msg == "ME connection lost" ); match (peer_closed, me_closed) { (true, _) => debug!(peer = %peer_addr, error = %e, "Connection closed by client"), (_, true) => warn!(peer = %peer_addr, error = %e, "Connection closed: Middle-End dropped session"), _ => warn!(peer = %peer_addr, error = %e, "Connection closed with error"), } } }); } Err(e) => { error!("Accept error: {}", e); tokio::time::sleep(Duration::from_millis(100)).await; } } } }); } match signal::ctrl_c().await { Ok(()) => { let shutdown_started_at = Instant::now(); info!("Shutting down..."); let uptime_secs = process_started_at.elapsed().as_secs(); info!("Uptime: {}", format_uptime(uptime_secs)); if let Some(pool) = &me_pool { match tokio::time::timeout( Duration::from_secs(2), pool.shutdown_send_close_conn_all(), ) .await { Ok(total) => { info!( close_conn_sent = total, "ME shutdown: RPC_CLOSE_CONN broadcast completed" ); } Err(_) => { warn!("ME shutdown: RPC_CLOSE_CONN broadcast timed out"); } } } let shutdown_secs = shutdown_started_at.elapsed().as_secs(); info!( "Shutdown completed successfully in {} {}.", shutdown_secs, unit_label(shutdown_secs, "second", "seconds") ); } Err(e) => error!("Signal error: {}", e), } Ok(()) }