From 4c94f73546271671cef25c30e139b6f4ba68f579 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Mon, 9 Mar 2026 11:05:46 +0300 Subject: [PATCH] Maestro - Refactored Main Format --- src/maestro/admission.rs | 116 ++ src/maestro/connectivity.rs | 220 ++++ src/maestro/helpers.rs | 320 +++++ src/maestro/listeners.rs | 465 +++++++ src/maestro/me_startup.rs | 425 +++++++ src/maestro/mod.rs | 546 ++++++++ src/maestro/runtime_tasks.rs | 317 +++++ src/maestro/shutdown.rs | 42 + src/maestro/tls_bootstrap.rs | 165 +++ src/main.rs | 2258 +--------------------------------- src/proxy/client.rs | 11 +- src/proxy/middle_relay.rs | 2 +- 12 files changed, 2627 insertions(+), 2260 deletions(-) create mode 100644 src/maestro/admission.rs create mode 100644 src/maestro/connectivity.rs create mode 100644 src/maestro/helpers.rs create mode 100644 src/maestro/listeners.rs create mode 100644 src/maestro/me_startup.rs create mode 100644 src/maestro/mod.rs create mode 100644 src/maestro/runtime_tasks.rs create mode 100644 src/maestro/shutdown.rs create mode 100644 src/maestro/tls_bootstrap.rs diff --git a/src/maestro/admission.rs b/src/maestro/admission.rs new file mode 100644 index 0000000..40a0c2d --- /dev/null +++ b/src/maestro/admission.rs @@ -0,0 +1,116 @@ +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use tokio::sync::watch; +use tracing::{info, warn}; + +use crate::config::ProxyConfig; +use crate::proxy::route_mode::{RelayRouteMode, RouteRuntimeController}; +use crate::transport::middle_proxy::MePool; + +pub(crate) async fn configure_admission_gate( + config: &Arc, + me_pool: Option>, + route_runtime: Arc, + admission_tx: &watch::Sender, + config_rx: watch::Receiver>, +) { + if config.general.use_middle_proxy { + if let Some(pool) = me_pool.as_ref() { + let fallback_after = Duration::from_secs(6); + let initial_ready = pool.admission_ready_conditional_cast().await; + admission_tx.send_replace(initial_ready); + let _ = route_runtime.set_mode(RelayRouteMode::Middle); + if initial_ready { + info!("Conditional-admission gate: open (ME pool ready)"); + } else { + warn!("Conditional-admission gate: closed (ME pool is not ready)"); + } + + let pool_for_gate = pool.clone(); + let admission_tx_gate = admission_tx.clone(); + let route_runtime_gate = route_runtime.clone(); + let mut config_rx_gate = config_rx.clone(); + let mut admission_poll_ms = config.general.me_admission_poll_ms.max(1); + let mut fallback_enabled = config.general.me2dc_fallback; + tokio::spawn(async move { + let mut gate_open = initial_ready; + let mut route_mode = RelayRouteMode::Middle; + let mut not_ready_since = if initial_ready { + None + } else { + Some(Instant::now()) + }; + loop { + tokio::select! { + changed = config_rx_gate.changed() => { + if changed.is_err() { + break; + } + let cfg = config_rx_gate.borrow_and_update().clone(); + admission_poll_ms = cfg.general.me_admission_poll_ms.max(1); + fallback_enabled = cfg.general.me2dc_fallback; + continue; + } + _ = tokio::time::sleep(Duration::from_millis(admission_poll_ms)) => {} + } + let ready = pool_for_gate.admission_ready_conditional_cast().await; + let now = Instant::now(); + let (next_gate_open, next_route_mode, next_fallback_active) = if ready { + not_ready_since = None; + (true, RelayRouteMode::Middle, false) + } else { + let not_ready_started_at = *not_ready_since.get_or_insert(now); + let not_ready_for = now.saturating_duration_since(not_ready_started_at); + if fallback_enabled && not_ready_for > fallback_after { + (true, RelayRouteMode::Direct, true) + } else { + (false, RelayRouteMode::Middle, false) + } + }; + + if next_route_mode != route_mode { + route_mode = next_route_mode; + if let Some(snapshot) = route_runtime_gate.set_mode(route_mode) { + if matches!(route_mode, RelayRouteMode::Middle) { + info!( + target_mode = route_mode.as_str(), + cutover_generation = snapshot.generation, + "Middle-End routing restored for new sessions" + ); + } else { + warn!( + target_mode = route_mode.as_str(), + cutover_generation = snapshot.generation, + grace_secs = fallback_after.as_secs(), + "ME pool stayed not-ready beyond grace; routing new sessions via Direct-DC" + ); + } + } + } + + if next_gate_open != gate_open { + gate_open = next_gate_open; + admission_tx_gate.send_replace(gate_open); + if gate_open { + if next_fallback_active { + warn!("Conditional-admission gate opened in ME fallback mode"); + } else { + info!("Conditional-admission gate opened (ME pool ready)"); + } + } else { + warn!("Conditional-admission gate closed (ME pool is not ready)"); + } + } + } + }); + } else { + admission_tx.send_replace(false); + let _ = route_runtime.set_mode(RelayRouteMode::Direct); + warn!("Conditional-admission gate: closed (ME pool is unavailable)"); + } + } else { + admission_tx.send_replace(true); + let _ = route_runtime.set_mode(RelayRouteMode::Direct); + } +} diff --git a/src/maestro/connectivity.rs b/src/maestro/connectivity.rs new file mode 100644 index 0000000..c843223 --- /dev/null +++ b/src/maestro/connectivity.rs @@ -0,0 +1,220 @@ +use std::sync::Arc; +use std::time::Instant; + +use tokio::sync::RwLock; +use tracing::info; + +use crate::config::ProxyConfig; +use crate::crypto::SecureRandom; +use crate::network::probe::NetworkDecision; +use crate::startup::{ + COMPONENT_DC_CONNECTIVITY_PING, COMPONENT_ME_CONNECTIVITY_PING, COMPONENT_RUNTIME_READY, + StartupTracker, +}; +use crate::transport::middle_proxy::{ + MePingFamily, MePingSample, MePool, format_me_route, format_sample_line, run_me_ping, +}; +use crate::transport::UpstreamManager; + +pub(crate) async fn run_startup_connectivity( + config: &Arc, + me_pool: &Option>, + rng: Arc, + startup_tracker: &Arc, + upstream_manager: Arc, + prefer_ipv6: bool, + decision: &NetworkDecision, + process_started_at: Instant, + api_me_pool: Arc>>>, +) { + if me_pool.is_some() { + startup_tracker + .start_component( + COMPONENT_ME_CONNECTIVITY_PING, + Some("run startup ME connectivity check".to_string()), + ) + .await; + } else { + startup_tracker + .skip_component( + COMPONENT_ME_CONNECTIVITY_PING, + Some("ME pool is not available".to_string()), + ) + .await; + } + if let Some(pool) = me_pool { + let me_results = run_me_ping(pool, &rng).await; + + let v4_ok = me_results.iter().any(|r| { + matches!(r.family, MePingFamily::V4) + && r.samples.iter().any(|s| s.error.is_none() && s.handshake_ms.is_some()) + }); + let v6_ok = me_results.iter().any(|r| { + matches!(r.family, MePingFamily::V6) + && r.samples.iter().any(|s| s.error.is_none() && s.handshake_ms.is_some()) + }); + + info!("================= Telegram ME Connectivity ================="); + if v4_ok && v6_ok { + info!(" IPv4 and IPv6 available"); + } else if v4_ok { + info!(" IPv4 only / IPv6 unavailable"); + } else if v6_ok { + info!(" IPv6 only / IPv4 unavailable"); + } else { + info!(" No ME connectivity"); + } + let me_route = + format_me_route(&config.upstreams, &me_results, prefer_ipv6, v4_ok, v6_ok).await; + info!(" via {}", me_route); + info!("============================================================"); + + use std::collections::BTreeMap; + let mut grouped: BTreeMap> = BTreeMap::new(); + for report in me_results { + for s in report.samples { + grouped.entry(s.dc).or_default().push(s); + } + } + + let family_order = if prefer_ipv6 { + vec![MePingFamily::V6, MePingFamily::V4] + } else { + vec![MePingFamily::V4, MePingFamily::V6] + }; + + for (dc, samples) in grouped { + for family in &family_order { + let fam_samples: Vec<&MePingSample> = samples + .iter() + .filter(|s| matches!(s.family, f if &f == family)) + .collect(); + if fam_samples.is_empty() { + continue; + } + + let fam_label = match family { + MePingFamily::V4 => "IPv4", + MePingFamily::V6 => "IPv6", + }; + info!(" DC{} [{}]", dc, fam_label); + for sample in fam_samples { + let line = format_sample_line(sample); + info!("{}", line); + } + } + } + info!("============================================================"); + startup_tracker + .complete_component( + COMPONENT_ME_CONNECTIVITY_PING, + Some("startup ME connectivity check completed".to_string()), + ) + .await; + } + + info!("================= Telegram DC Connectivity ================="); + startup_tracker + .start_component( + COMPONENT_DC_CONNECTIVITY_PING, + Some("run startup DC connectivity check".to_string()), + ) + .await; + + let ping_results = upstream_manager + .ping_all_dcs( + prefer_ipv6, + &config.dc_overrides, + decision.ipv4_dc, + decision.ipv6_dc, + ) + .await; + + for upstream_result in &ping_results { + let v6_works = upstream_result.v6_results.iter().any(|r| r.rtt_ms.is_some()); + let v4_works = upstream_result.v4_results.iter().any(|r| r.rtt_ms.is_some()); + + if upstream_result.both_available { + if prefer_ipv6 { + info!(" IPv6 in use / IPv4 is fallback"); + } else { + info!(" IPv4 in use / IPv6 is fallback"); + } + } else if v6_works && !v4_works { + info!(" IPv6 only / IPv4 unavailable"); + } else if v4_works && !v6_works { + info!(" IPv4 only / IPv6 unavailable"); + } else if !v6_works && !v4_works { + info!(" No DC connectivity"); + } + + info!(" via {}", upstream_result.upstream_name); + info!("============================================================"); + + if v6_works { + for dc in &upstream_result.v6_results { + let addr_str = format!("{}:{}", dc.dc_addr.ip(), dc.dc_addr.port()); + match &dc.rtt_ms { + Some(rtt) => { + info!(" DC{} [IPv6] {} - {:.0} ms", dc.dc_idx, addr_str, rtt); + } + None => { + let err = dc.error.as_deref().unwrap_or("fail"); + info!(" DC{} [IPv6] {} - FAIL ({})", dc.dc_idx, addr_str, err); + } + } + } + + info!("============================================================"); + } + + if v4_works { + for dc in &upstream_result.v4_results { + let addr_str = format!("{}:{}", dc.dc_addr.ip(), dc.dc_addr.port()); + match &dc.rtt_ms { + Some(rtt) => { + info!( + " DC{} [IPv4] {}\t\t\t\t{:.0} ms", + dc.dc_idx, addr_str, rtt + ); + } + None => { + let err = dc.error.as_deref().unwrap_or("fail"); + info!( + " DC{} [IPv4] {}:\t\t\t\tFAIL ({})", + dc.dc_idx, addr_str, err + ); + } + } + } + + info!("============================================================"); + } + } + startup_tracker + .complete_component( + COMPONENT_DC_CONNECTIVITY_PING, + Some("startup DC connectivity check completed".to_string()), + ) + .await; + + let initialized_secs = process_started_at.elapsed().as_secs(); + let second_suffix = if initialized_secs == 1 { "" } else { "s" }; + startup_tracker + .start_component( + COMPONENT_RUNTIME_READY, + Some("finalize startup runtime state".to_string()), + ) + .await; + info!("===================== Telegram Startup ====================="); + info!( + " DC/ME Initialized in {} second{}", + initialized_secs, second_suffix + ); + info!("============================================================"); + + if let Some(pool) = me_pool { + pool.set_runtime_ready(true); + } + *api_me_pool.write().await = me_pool.clone(); +} diff --git a/src/maestro/helpers.rs b/src/maestro/helpers.rs new file mode 100644 index 0000000..acaecdd --- /dev/null +++ b/src/maestro/helpers.rs @@ -0,0 +1,320 @@ +use std::time::Duration; + +use tokio::sync::watch; +use tracing::{debug, error, info, warn}; + +use crate::cli; +use crate::config::ProxyConfig; +use crate::transport::middle_proxy::{ + ProxyConfigData, fetch_proxy_config_with_raw, load_proxy_config_cache, save_proxy_config_cache, +}; + +pub(crate) fn parse_cli() -> (String, bool, Option) { + let mut config_path = "config.toml".to_string(); + let mut silent = false; + let mut log_level: Option = None; + + let args: Vec = std::env::args().skip(1).collect(); + + // Check for --init first (handled before tokio) + if let Some(init_opts) = cli::parse_init_args(&args) { + if let Err(e) = cli::run_init(init_opts) { + eprintln!("[telemt] Init failed: {}", e); + std::process::exit(1); + } + std::process::exit(0); + } + + let mut i = 0; + while i < args.len() { + match args[i].as_str() { + "--silent" | "-s" => { + silent = true; + } + "--log-level" => { + i += 1; + if i < args.len() { + log_level = Some(args[i].clone()); + } + } + s if s.starts_with("--log-level=") => { + log_level = Some(s.trim_start_matches("--log-level=").to_string()); + } + "--help" | "-h" => { + eprintln!("Usage: telemt [config.toml] [OPTIONS]"); + eprintln!(); + eprintln!("Options:"); + eprintln!(" --silent, -s Suppress info logs"); + eprintln!(" --log-level debug|verbose|normal|silent"); + eprintln!(" --help, -h Show this help"); + eprintln!(); + eprintln!("Setup (fire-and-forget):"); + eprintln!( + " --init Generate config, install systemd service, start" + ); + eprintln!(" --port Listen port (default: 443)"); + eprintln!( + " --domain TLS domain for masking (default: www.google.com)" + ); + eprintln!( + " --secret 32-char hex secret (auto-generated if omitted)" + ); + eprintln!(" --user Username (default: user)"); + eprintln!(" --config-dir Config directory (default: /etc/telemt)"); + eprintln!(" --no-start Don't start the service after install"); + std::process::exit(0); + } + "--version" | "-V" => { + println!("telemt {}", env!("CARGO_PKG_VERSION")); + std::process::exit(0); + } + s if !s.starts_with('-') => { + config_path = s.to_string(); + } + other => { + eprintln!("Unknown option: {}", other); + } + } + i += 1; + } + + (config_path, silent, log_level) +} + +pub(crate) fn print_proxy_links(host: &str, port: u16, config: &ProxyConfig) { + info!(target: "telemt::links", "--- Proxy Links ({}) ---", host); + for user_name in config.general.links.show.resolve_users(&config.access.users) { + if let Some(secret) = config.access.users.get(user_name) { + info!(target: "telemt::links", "User: {}", user_name); + if config.general.modes.classic { + info!( + target: "telemt::links", + " Classic: tg://proxy?server={}&port={}&secret={}", + host, port, secret + ); + } + if config.general.modes.secure { + info!( + target: "telemt::links", + " DD: tg://proxy?server={}&port={}&secret=dd{}", + host, port, secret + ); + } + if config.general.modes.tls { + let mut domains = Vec::with_capacity(1 + config.censorship.tls_domains.len()); + domains.push(config.censorship.tls_domain.clone()); + for d in &config.censorship.tls_domains { + if !domains.contains(d) { + domains.push(d.clone()); + } + } + + for domain in domains { + let domain_hex = hex::encode(&domain); + info!( + target: "telemt::links", + " EE-TLS: tg://proxy?server={}&port={}&secret=ee{}{}", + host, port, secret, domain_hex + ); + } + } + } else { + warn!(target: "telemt::links", "User '{}' in show_link not found", user_name); + } + } + info!(target: "telemt::links", "------------------------"); +} + +pub(crate) async fn write_beobachten_snapshot(path: &str, payload: &str) -> std::io::Result<()> { + if let Some(parent) = std::path::Path::new(path).parent() + && !parent.as_os_str().is_empty() + { + tokio::fs::create_dir_all(parent).await?; + } + tokio::fs::write(path, payload).await +} + +pub(crate) fn unit_label(value: u64, singular: &'static str, plural: &'static str) -> &'static str { + if value == 1 { singular } else { plural } +} + +pub(crate) fn format_uptime(total_secs: u64) -> String { + const SECS_PER_MINUTE: u64 = 60; + const SECS_PER_HOUR: u64 = 60 * SECS_PER_MINUTE; + const SECS_PER_DAY: u64 = 24 * SECS_PER_HOUR; + const SECS_PER_MONTH: u64 = 30 * SECS_PER_DAY; + const SECS_PER_YEAR: u64 = 12 * SECS_PER_MONTH; + + let mut remaining = total_secs; + let years = remaining / SECS_PER_YEAR; + remaining %= SECS_PER_YEAR; + let months = remaining / SECS_PER_MONTH; + remaining %= SECS_PER_MONTH; + let days = remaining / SECS_PER_DAY; + remaining %= SECS_PER_DAY; + let hours = remaining / SECS_PER_HOUR; + remaining %= SECS_PER_HOUR; + let minutes = remaining / SECS_PER_MINUTE; + let seconds = remaining % SECS_PER_MINUTE; + + let mut parts = Vec::new(); + if total_secs > SECS_PER_YEAR { + parts.push(format!("{} {}", years, unit_label(years, "year", "years"))); + } + if total_secs > SECS_PER_MONTH { + parts.push(format!( + "{} {}", + months, + unit_label(months, "month", "months") + )); + } + if total_secs > SECS_PER_DAY { + parts.push(format!("{} {}", days, unit_label(days, "day", "days"))); + } + if total_secs > SECS_PER_HOUR { + parts.push(format!("{} {}", hours, unit_label(hours, "hour", "hours"))); + } + if total_secs > SECS_PER_MINUTE { + parts.push(format!( + "{} {}", + minutes, + unit_label(minutes, "minute", "minutes") + )); + } + parts.push(format!( + "{} {}", + seconds, + unit_label(seconds, "second", "seconds") + )); + + format!("{} / {} seconds", parts.join(", "), total_secs) +} + +pub(crate) async fn wait_until_admission_open(admission_rx: &mut watch::Receiver) -> bool { + loop { + if *admission_rx.borrow() { + return true; + } + if admission_rx.changed().await.is_err() { + return *admission_rx.borrow(); + } + } +} + +pub(crate) fn is_expected_handshake_eof(err: &crate::error::ProxyError) -> bool { + err.to_string().contains("expected 64 bytes, got 0") +} + +pub(crate) async fn load_startup_proxy_config_snapshot( + url: &str, + cache_path: Option<&str>, + me2dc_fallback: bool, + label: &'static str, +) -> Option { + loop { + match fetch_proxy_config_with_raw(url).await { + Ok((cfg, raw)) => { + if !cfg.map.is_empty() { + if let Some(path) = cache_path + && let Err(e) = save_proxy_config_cache(path, &raw).await + { + warn!(error = %e, path, snapshot = label, "Failed to store startup proxy-config cache"); + } + return Some(cfg); + } + + warn!(snapshot = label, url, "Startup proxy-config is empty; trying disk cache"); + if let Some(path) = cache_path { + match load_proxy_config_cache(path).await { + Ok(cached) if !cached.map.is_empty() => { + info!( + snapshot = label, + path, + proxy_for_lines = cached.proxy_for_lines, + "Loaded startup proxy-config from disk cache" + ); + return Some(cached); + } + Ok(_) => { + warn!( + snapshot = label, + path, + "Startup proxy-config cache is empty; ignoring cache file" + ); + } + Err(cache_err) => { + debug!( + snapshot = label, + path, + error = %cache_err, + "Startup proxy-config cache unavailable" + ); + } + } + } + + if me2dc_fallback { + error!( + snapshot = label, + "Startup proxy-config unavailable and no saved config found; falling back to direct mode" + ); + return None; + } + + warn!( + snapshot = label, + retry_in_secs = 2, + "Startup proxy-config unavailable and no saved config found; retrying because me2dc_fallback=false" + ); + tokio::time::sleep(Duration::from_secs(2)).await; + } + Err(fetch_err) => { + if let Some(path) = cache_path { + match load_proxy_config_cache(path).await { + Ok(cached) if !cached.map.is_empty() => { + info!( + snapshot = label, + path, + proxy_for_lines = cached.proxy_for_lines, + "Loaded startup proxy-config from disk cache" + ); + return Some(cached); + } + Ok(_) => { + warn!( + snapshot = label, + path, + "Startup proxy-config cache is empty; ignoring cache file" + ); + } + Err(cache_err) => { + debug!( + snapshot = label, + path, + error = %cache_err, + "Startup proxy-config cache unavailable" + ); + } + } + } + + if me2dc_fallback { + error!( + snapshot = label, + error = %fetch_err, + "Startup proxy-config unavailable and no cached data; falling back to direct mode" + ); + return None; + } + + warn!( + snapshot = label, + error = %fetch_err, + retry_in_secs = 2, + "Startup proxy-config unavailable; retrying because me2dc_fallback=false" + ); + tokio::time::sleep(Duration::from_secs(2)).await; + } + } + } +} diff --git a/src/maestro/listeners.rs b/src/maestro/listeners.rs new file mode 100644 index 0000000..6296fd7 --- /dev/null +++ b/src/maestro/listeners.rs @@ -0,0 +1,465 @@ +use std::error::Error; +use std::net::{IpAddr, SocketAddr}; +use std::sync::Arc; +use std::time::Duration; + +use tokio::net::TcpListener; +#[cfg(unix)] +use tokio::net::UnixListener; +use tokio::sync::{Semaphore, watch}; +use tracing::{debug, error, info, warn}; + +use crate::config::ProxyConfig; +use crate::crypto::SecureRandom; +use crate::ip_tracker::UserIpTracker; +use crate::proxy::route_mode::{ROUTE_SWITCH_ERROR_MSG, RouteRuntimeController}; +use crate::proxy::ClientHandler; +use crate::startup::{COMPONENT_LISTENERS_BIND, StartupTracker}; +use crate::stats::beobachten::BeobachtenStore; +use crate::stats::{ReplayChecker, Stats}; +use crate::stream::BufferPool; +use crate::tls_front::TlsFrontCache; +use crate::transport::middle_proxy::MePool; +use crate::transport::{ + ListenOptions, UpstreamManager, create_listener, find_listener_processes, +}; + +use super::helpers::{is_expected_handshake_eof, print_proxy_links, wait_until_admission_open}; + +pub(crate) struct BoundListeners { + pub(crate) listeners: Vec<(TcpListener, bool)>, + pub(crate) has_unix_listener: bool, +} + +#[allow(clippy::too_many_arguments)] +pub(crate) async fn bind_listeners( + config: &Arc, + decision_ipv4_dc: bool, + decision_ipv6_dc: bool, + detected_ip_v4: Option, + detected_ip_v6: Option, + startup_tracker: &Arc, + config_rx: watch::Receiver>, + admission_rx: watch::Receiver, + stats: Arc, + upstream_manager: Arc, + replay_checker: Arc, + buffer_pool: Arc, + rng: Arc, + me_pool: Option>, + route_runtime: Arc, + tls_cache: Option>, + ip_tracker: Arc, + beobachten: Arc, + max_connections: Arc, +) -> Result> { + startup_tracker + .start_component( + COMPONENT_LISTENERS_BIND, + Some("bind TCP/Unix listeners".to_string()), + ) + .await; + let mut listeners = Vec::new(); + + for listener_conf in &config.server.listeners { + let addr = SocketAddr::new(listener_conf.ip, config.server.port); + if addr.is_ipv4() && !decision_ipv4_dc { + warn!(%addr, "Skipping IPv4 listener: IPv4 disabled by [network]"); + continue; + } + if addr.is_ipv6() && !decision_ipv6_dc { + warn!(%addr, "Skipping IPv6 listener: IPv6 disabled by [network]"); + continue; + } + let options = ListenOptions { + reuse_port: listener_conf.reuse_allow, + ipv6_only: listener_conf.ip.is_ipv6(), + ..Default::default() + }; + + match create_listener(addr, &options) { + Ok(socket) => { + let listener = TcpListener::from_std(socket.into())?; + info!("Listening on {}", addr); + let listener_proxy_protocol = + listener_conf.proxy_protocol.unwrap_or(config.server.proxy_protocol); + + let public_host = if let Some(ref announce) = listener_conf.announce { + announce.clone() + } else if listener_conf.ip.is_unspecified() { + if listener_conf.ip.is_ipv4() { + detected_ip_v4 + .map(|ip| ip.to_string()) + .unwrap_or_else(|| listener_conf.ip.to_string()) + } else { + detected_ip_v6 + .map(|ip| ip.to_string()) + .unwrap_or_else(|| listener_conf.ip.to_string()) + } + } else { + listener_conf.ip.to_string() + }; + + if config.general.links.public_host.is_none() && !config.general.links.show.is_empty() { + let link_port = config.general.links.public_port.unwrap_or(config.server.port); + print_proxy_links(&public_host, link_port, config); + } + + listeners.push((listener, listener_proxy_protocol)); + } + Err(e) => { + if e.kind() == std::io::ErrorKind::AddrInUse { + let owners = find_listener_processes(addr); + if owners.is_empty() { + error!( + %addr, + "Failed to bind: address already in use (owner process unresolved)" + ); + } else { + for owner in owners { + error!( + %addr, + pid = owner.pid, + process = %owner.process, + "Failed to bind: address already in use" + ); + } + } + + if !listener_conf.reuse_allow { + error!( + %addr, + "reuse_allow=false; set [[server.listeners]].reuse_allow=true to allow multi-instance listening" + ); + } + } else { + error!("Failed to bind to {}: {}", addr, e); + } + } + } + } + + if !config.general.links.show.is_empty() + && (config.general.links.public_host.is_some() || listeners.is_empty()) + { + let (host, port) = if let Some(ref h) = config.general.links.public_host { + ( + h.clone(), + config.general.links.public_port.unwrap_or(config.server.port), + ) + } else { + let ip = detected_ip_v4 + .or(detected_ip_v6) + .map(|ip| ip.to_string()); + if ip.is_none() { + warn!( + "show_link is configured but public IP could not be detected. Set public_host in config." + ); + } + ( + ip.unwrap_or_else(|| "UNKNOWN".to_string()), + config.general.links.public_port.unwrap_or(config.server.port), + ) + }; + + print_proxy_links(&host, port, config); + } + + let mut has_unix_listener = false; + #[cfg(unix)] + if let Some(ref unix_path) = config.server.listen_unix_sock { + let _ = tokio::fs::remove_file(unix_path).await; + + let unix_listener = UnixListener::bind(unix_path)?; + + if let Some(ref perm_str) = config.server.listen_unix_sock_perm { + match u32::from_str_radix(perm_str.trim_start_matches('0'), 8) { + Ok(mode) => { + use std::os::unix::fs::PermissionsExt; + let perms = std::fs::Permissions::from_mode(mode); + if let Err(e) = std::fs::set_permissions(unix_path, perms) { + error!("Failed to set unix socket permissions to {}: {}", perm_str, e); + } else { + info!("Listening on unix:{} (mode {})", unix_path, perm_str); + } + } + Err(e) => { + warn!("Invalid listen_unix_sock_perm '{}': {}. Ignoring.", perm_str, e); + info!("Listening on unix:{}", unix_path); + } + } + } else { + info!("Listening on unix:{}", unix_path); + } + + has_unix_listener = true; + + let mut config_rx_unix: watch::Receiver> = config_rx.clone(); + let mut admission_rx_unix = admission_rx.clone(); + let stats = stats.clone(); + let upstream_manager = upstream_manager.clone(); + let replay_checker = replay_checker.clone(); + let buffer_pool = buffer_pool.clone(); + let rng = rng.clone(); + let me_pool = me_pool.clone(); + let route_runtime = route_runtime.clone(); + let tls_cache = tls_cache.clone(); + let ip_tracker = ip_tracker.clone(); + let beobachten = beobachten.clone(); + let max_connections_unix = max_connections.clone(); + + tokio::spawn(async move { + let unix_conn_counter = Arc::new(std::sync::atomic::AtomicU64::new(1)); + + loop { + if !wait_until_admission_open(&mut admission_rx_unix).await { + warn!("Conditional-admission gate channel closed for unix listener"); + break; + } + match unix_listener.accept().await { + Ok((stream, _)) => { + let permit = match max_connections_unix.clone().acquire_owned().await { + Ok(permit) => permit, + Err(_) => { + error!("Connection limiter is closed"); + break; + } + }; + let conn_id = + unix_conn_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let fake_peer = + SocketAddr::from(([127, 0, 0, 1], (conn_id % 65535) as u16)); + + let config = config_rx_unix.borrow_and_update().clone(); + let stats = stats.clone(); + let upstream_manager = upstream_manager.clone(); + let replay_checker = replay_checker.clone(); + let buffer_pool = buffer_pool.clone(); + let rng = rng.clone(); + let me_pool = me_pool.clone(); + let route_runtime = route_runtime.clone(); + let tls_cache = tls_cache.clone(); + let ip_tracker = ip_tracker.clone(); + let beobachten = beobachten.clone(); + let proxy_protocol_enabled = config.server.proxy_protocol; + + tokio::spawn(async move { + let _permit = permit; + if let Err(e) = crate::proxy::client::handle_client_stream( + stream, + fake_peer, + config, + stats, + upstream_manager, + replay_checker, + buffer_pool, + rng, + me_pool, + route_runtime, + tls_cache, + ip_tracker, + beobachten, + proxy_protocol_enabled, + ) + .await + { + debug!(error = %e, "Unix socket connection error"); + } + }); + } + Err(e) => { + error!("Unix socket accept error: {}", e); + tokio::time::sleep(Duration::from_millis(100)).await; + } + } + } + }); + } + + startup_tracker + .complete_component( + COMPONENT_LISTENERS_BIND, + Some(format!( + "listeners configured tcp={} unix={}", + listeners.len(), + has_unix_listener + )), + ) + .await; + + Ok(BoundListeners { + listeners, + has_unix_listener, + }) +} + +#[allow(clippy::too_many_arguments)] +pub(crate) fn spawn_tcp_accept_loops( + listeners: Vec<(TcpListener, bool)>, + config_rx: watch::Receiver>, + admission_rx: watch::Receiver, + stats: Arc, + upstream_manager: Arc, + replay_checker: Arc, + buffer_pool: Arc, + rng: Arc, + me_pool: Option>, + route_runtime: Arc, + tls_cache: Option>, + ip_tracker: Arc, + beobachten: Arc, + max_connections: Arc, +) { + for (listener, listener_proxy_protocol) in listeners { + let mut config_rx: watch::Receiver> = config_rx.clone(); + let mut admission_rx_tcp = admission_rx.clone(); + let stats = stats.clone(); + let upstream_manager = upstream_manager.clone(); + let replay_checker = replay_checker.clone(); + let buffer_pool = buffer_pool.clone(); + let rng = rng.clone(); + let me_pool = me_pool.clone(); + let route_runtime = route_runtime.clone(); + let tls_cache = tls_cache.clone(); + let ip_tracker = ip_tracker.clone(); + let beobachten = beobachten.clone(); + let max_connections_tcp = max_connections.clone(); + + tokio::spawn(async move { + loop { + if !wait_until_admission_open(&mut admission_rx_tcp).await { + warn!("Conditional-admission gate channel closed for tcp listener"); + break; + } + match listener.accept().await { + Ok((stream, peer_addr)) => { + let permit = match max_connections_tcp.clone().acquire_owned().await { + Ok(permit) => permit, + Err(_) => { + error!("Connection limiter is closed"); + break; + } + }; + let config = config_rx.borrow_and_update().clone(); + let stats = stats.clone(); + let upstream_manager = upstream_manager.clone(); + let replay_checker = replay_checker.clone(); + let buffer_pool = buffer_pool.clone(); + let rng = rng.clone(); + let me_pool = me_pool.clone(); + let route_runtime = route_runtime.clone(); + let tls_cache = tls_cache.clone(); + let ip_tracker = ip_tracker.clone(); + let beobachten = beobachten.clone(); + let proxy_protocol_enabled = listener_proxy_protocol; + let real_peer_report = Arc::new(std::sync::Mutex::new(None)); + let real_peer_report_for_handler = real_peer_report.clone(); + + tokio::spawn(async move { + let _permit = permit; + if let Err(e) = ClientHandler::new( + stream, + peer_addr, + config, + stats, + upstream_manager, + replay_checker, + buffer_pool, + rng, + me_pool, + route_runtime, + tls_cache, + ip_tracker, + beobachten, + proxy_protocol_enabled, + real_peer_report_for_handler, + ) + .run() + .await + { + let real_peer = match real_peer_report.lock() { + Ok(guard) => *guard, + Err(_) => None, + }; + let peer_closed = matches!( + &e, + crate::error::ProxyError::Io(ioe) + if matches!( + ioe.kind(), + std::io::ErrorKind::ConnectionReset + | std::io::ErrorKind::ConnectionAborted + | std::io::ErrorKind::BrokenPipe + | std::io::ErrorKind::NotConnected + ) + ) || matches!( + &e, + crate::error::ProxyError::Stream( + crate::error::StreamError::Io(ioe) + ) + if matches!( + ioe.kind(), + std::io::ErrorKind::ConnectionReset + | std::io::ErrorKind::ConnectionAborted + | std::io::ErrorKind::BrokenPipe + | std::io::ErrorKind::NotConnected + ) + ); + + let me_closed = matches!( + &e, + crate::error::ProxyError::Proxy(msg) if msg == "ME connection lost" + ); + let route_switched = matches!( + &e, + crate::error::ProxyError::Proxy(msg) if msg == ROUTE_SWITCH_ERROR_MSG + ); + + match (peer_closed, me_closed) { + (true, _) => { + if let Some(real_peer) = real_peer { + debug!(peer = %peer_addr, real_peer = %real_peer, error = %e, "Connection closed by client"); + } else { + debug!(peer = %peer_addr, error = %e, "Connection closed by client"); + } + } + (_, true) => { + if let Some(real_peer) = real_peer { + warn!(peer = %peer_addr, real_peer = %real_peer, error = %e, "Connection closed: Middle-End dropped session"); + } else { + warn!(peer = %peer_addr, error = %e, "Connection closed: Middle-End dropped session"); + } + } + _ if route_switched => { + if let Some(real_peer) = real_peer { + info!(peer = %peer_addr, real_peer = %real_peer, error = %e, "Connection closed by controlled route cutover"); + } else { + info!(peer = %peer_addr, error = %e, "Connection closed by controlled route cutover"); + } + } + _ if is_expected_handshake_eof(&e) => { + if let Some(real_peer) = real_peer { + info!(peer = %peer_addr, real_peer = %real_peer, error = %e, "Connection closed during initial handshake"); + } else { + info!(peer = %peer_addr, error = %e, "Connection closed during initial handshake"); + } + } + _ => { + if let Some(real_peer) = real_peer { + warn!(peer = %peer_addr, real_peer = %real_peer, error = %e, "Connection closed with error"); + } else { + warn!(peer = %peer_addr, error = %e, "Connection closed with error"); + } + } + } + } + }); + } + Err(e) => { + error!("Accept error: {}", e); + tokio::time::sleep(Duration::from_millis(100)).await; + } + } + } + }); + } +} diff --git a/src/maestro/me_startup.rs b/src/maestro/me_startup.rs new file mode 100644 index 0000000..9c674cd --- /dev/null +++ b/src/maestro/me_startup.rs @@ -0,0 +1,425 @@ +use std::sync::Arc; +use std::time::Duration; + +use tokio::sync::RwLock; +use tracing::{error, info, warn}; + +use crate::config::ProxyConfig; +use crate::crypto::SecureRandom; +use crate::network::probe::{NetworkDecision, NetworkProbe}; +use crate::startup::{ + COMPONENT_ME_POOL_CONSTRUCT, COMPONENT_ME_POOL_INIT_STAGE1, COMPONENT_ME_PROXY_CONFIG_V4, + COMPONENT_ME_PROXY_CONFIG_V6, COMPONENT_ME_SECRET_FETCH, StartupMeStatus, StartupTracker, +}; +use crate::stats::Stats; +use crate::transport::middle_proxy::MePool; +use crate::transport::UpstreamManager; + +use super::helpers::load_startup_proxy_config_snapshot; + +pub(crate) async fn initialize_me_pool( + use_middle_proxy: bool, + config: &ProxyConfig, + decision: &NetworkDecision, + probe: &NetworkProbe, + startup_tracker: &Arc, + upstream_manager: Arc, + rng: Arc, + stats: Arc, + api_me_pool: Arc>>>, +) -> Option> { + if !use_middle_proxy { + return None; + } + + info!("=== Middle Proxy Mode ==="); + let me_nat_probe = config.general.middle_proxy_nat_probe && config.network.stun_use; + if config.general.middle_proxy_nat_probe && !config.network.stun_use { + info!("Middle-proxy STUN probing disabled by network.stun_use=false"); + } + + let me2dc_fallback = config.general.me2dc_fallback; + let me_init_retry_attempts = config.general.me_init_retry_attempts; + let me_init_warn_after_attempts: u32 = 3; + + // Global ad_tag (pool default). Used when user has no per-user tag in access.user_ad_tags. + let proxy_tag = config + .general + .ad_tag + .as_ref() + .map(|tag| hex::decode(tag).expect("general.ad_tag must be validated before startup")); + + // ============================================================= + // CRITICAL: Download Telegram proxy-secret (NOT user secret!) + // + // C MTProxy uses TWO separate secrets: + // -S flag = 16-byte user secret for client obfuscation + // --aes-pwd = 32-512 byte binary file for ME RPC auth + // + // proxy-secret is from: https://core.telegram.org/getProxySecret + // ============================================================= + let proxy_secret_path = config.general.proxy_secret_path.as_deref(); + let pool_size = config.general.middle_proxy_pool_size.max(1); + let proxy_secret = loop { + match crate::transport::middle_proxy::fetch_proxy_secret( + proxy_secret_path, + config.general.proxy_secret_len_max, + ) + .await + { + Ok(proxy_secret) => break Some(proxy_secret), + Err(e) => { + startup_tracker.set_me_last_error(Some(e.to_string())).await; + if me2dc_fallback { + error!( + error = %e, + "ME startup failed: proxy-secret is unavailable and no saved secret found; falling back to direct mode" + ); + break None; + } + + warn!( + error = %e, + retry_in_secs = 2, + "ME startup failed: proxy-secret is unavailable and no saved secret found; retrying because me2dc_fallback=false" + ); + tokio::time::sleep(Duration::from_secs(2)).await; + } + } + }; + match proxy_secret { + Some(proxy_secret) => { + startup_tracker + .complete_component( + COMPONENT_ME_SECRET_FETCH, + Some("proxy-secret loaded".to_string()), + ) + .await; + info!( + secret_len = proxy_secret.len(), + key_sig = format_args!( + "0x{:08x}", + if proxy_secret.len() >= 4 { + u32::from_le_bytes([ + proxy_secret[0], + proxy_secret[1], + proxy_secret[2], + proxy_secret[3], + ]) + } else { + 0 + } + ), + "Proxy-secret loaded" + ); + + startup_tracker + .start_component( + COMPONENT_ME_PROXY_CONFIG_V4, + Some("load startup proxy-config v4".to_string()), + ) + .await; + startup_tracker + .set_me_status(StartupMeStatus::Initializing, COMPONENT_ME_PROXY_CONFIG_V4) + .await; + let cfg_v4 = load_startup_proxy_config_snapshot( + "https://core.telegram.org/getProxyConfig", + config.general.proxy_config_v4_cache_path.as_deref(), + me2dc_fallback, + "getProxyConfig", + ) + .await; + if cfg_v4.is_some() { + startup_tracker + .complete_component( + COMPONENT_ME_PROXY_CONFIG_V4, + Some("proxy-config v4 loaded".to_string()), + ) + .await; + } else { + startup_tracker + .fail_component( + COMPONENT_ME_PROXY_CONFIG_V4, + Some("proxy-config v4 unavailable".to_string()), + ) + .await; + } + startup_tracker + .start_component( + COMPONENT_ME_PROXY_CONFIG_V6, + Some("load startup proxy-config v6".to_string()), + ) + .await; + startup_tracker + .set_me_status(StartupMeStatus::Initializing, COMPONENT_ME_PROXY_CONFIG_V6) + .await; + let cfg_v6 = load_startup_proxy_config_snapshot( + "https://core.telegram.org/getProxyConfigV6", + config.general.proxy_config_v6_cache_path.as_deref(), + me2dc_fallback, + "getProxyConfigV6", + ) + .await; + if cfg_v6.is_some() { + startup_tracker + .complete_component( + COMPONENT_ME_PROXY_CONFIG_V6, + Some("proxy-config v6 loaded".to_string()), + ) + .await; + } else { + startup_tracker + .fail_component( + COMPONENT_ME_PROXY_CONFIG_V6, + Some("proxy-config v6 unavailable".to_string()), + ) + .await; + } + + if let (Some(cfg_v4), Some(cfg_v6)) = (cfg_v4, cfg_v6) { + startup_tracker + .start_component( + COMPONENT_ME_POOL_CONSTRUCT, + Some("construct ME pool".to_string()), + ) + .await; + startup_tracker + .set_me_status(StartupMeStatus::Initializing, COMPONENT_ME_POOL_CONSTRUCT) + .await; + let pool = MePool::new( + proxy_tag.clone(), + proxy_secret, + config.general.middle_proxy_nat_ip, + me_nat_probe, + None, + config.network.stun_servers.clone(), + config.general.stun_nat_probe_concurrency, + probe.detected_ipv6, + config.timeouts.me_one_retry, + config.timeouts.me_one_timeout_ms, + cfg_v4.map.clone(), + cfg_v6.map.clone(), + cfg_v4.default_dc.or(cfg_v6.default_dc), + decision.clone(), + Some(upstream_manager.clone()), + rng.clone(), + stats.clone(), + config.general.me_keepalive_enabled, + config.general.me_keepalive_interval_secs, + config.general.me_keepalive_jitter_secs, + config.general.me_keepalive_payload_random, + config.general.rpc_proxy_req_every, + config.general.me_warmup_stagger_enabled, + config.general.me_warmup_step_delay_ms, + config.general.me_warmup_step_jitter_ms, + config.general.me_reconnect_max_concurrent_per_dc, + config.general.me_reconnect_backoff_base_ms, + config.general.me_reconnect_backoff_cap_ms, + config.general.me_reconnect_fast_retry_count, + config.general.me_single_endpoint_shadow_writers, + config.general.me_single_endpoint_outage_mode_enabled, + config.general.me_single_endpoint_outage_disable_quarantine, + config.general.me_single_endpoint_outage_backoff_min_ms, + config.general.me_single_endpoint_outage_backoff_max_ms, + config.general.me_single_endpoint_shadow_rotate_every_secs, + config.general.me_floor_mode, + config.general.me_adaptive_floor_idle_secs, + config.general.me_adaptive_floor_min_writers_single_endpoint, + config.general.me_adaptive_floor_min_writers_multi_endpoint, + config.general.me_adaptive_floor_recover_grace_secs, + config.general.me_adaptive_floor_writers_per_core_total, + config.general.me_adaptive_floor_cpu_cores_override, + config.general.me_adaptive_floor_max_extra_writers_single_per_core, + config.general.me_adaptive_floor_max_extra_writers_multi_per_core, + config.general.me_adaptive_floor_max_active_writers_per_core, + config.general.me_adaptive_floor_max_warm_writers_per_core, + config.general.me_adaptive_floor_max_active_writers_global, + config.general.me_adaptive_floor_max_warm_writers_global, + config.general.hardswap, + config.general.me_pool_drain_ttl_secs, + config.general.effective_me_pool_force_close_secs(), + config.general.me_pool_min_fresh_ratio, + config.general.me_hardswap_warmup_delay_min_ms, + config.general.me_hardswap_warmup_delay_max_ms, + config.general.me_hardswap_warmup_extra_passes, + config.general.me_hardswap_warmup_pass_backoff_base_ms, + config.general.me_bind_stale_mode, + config.general.me_bind_stale_ttl_secs, + config.general.me_secret_atomic_snapshot, + config.general.me_deterministic_writer_sort, + config.general.me_writer_pick_mode, + config.general.me_writer_pick_sample_size, + config.general.me_socks_kdf_policy, + config.general.me_writer_cmd_channel_capacity, + config.general.me_route_channel_capacity, + config.general.me_route_backpressure_base_timeout_ms, + config.general.me_route_backpressure_high_timeout_ms, + config.general.me_route_backpressure_high_watermark_pct, + config.general.me_reader_route_data_wait_ms, + config.general.me_health_interval_ms_unhealthy, + config.general.me_health_interval_ms_healthy, + config.general.me_warn_rate_limit_ms, + config.general.me_route_no_writer_mode, + config.general.me_route_no_writer_wait_ms, + config.general.me_route_inline_recovery_attempts, + config.general.me_route_inline_recovery_wait_ms, + ); + startup_tracker + .complete_component( + COMPONENT_ME_POOL_CONSTRUCT, + Some("ME pool object created".to_string()), + ) + .await; + *api_me_pool.write().await = Some(pool.clone()); + startup_tracker + .start_component( + COMPONENT_ME_POOL_INIT_STAGE1, + Some("initialize ME pool writers".to_string()), + ) + .await; + startup_tracker + .set_me_status(StartupMeStatus::Initializing, COMPONENT_ME_POOL_INIT_STAGE1) + .await; + + let mut init_attempt: u32 = 0; + loop { + init_attempt = init_attempt.saturating_add(1); + startup_tracker.set_me_init_attempt(init_attempt).await; + match pool.init(pool_size, &rng).await { + Ok(()) => { + startup_tracker.set_me_last_error(None).await; + startup_tracker + .complete_component( + COMPONENT_ME_POOL_INIT_STAGE1, + Some("ME pool initialized".to_string()), + ) + .await; + startup_tracker + .set_me_status(StartupMeStatus::Ready, "ready") + .await; + info!( + attempt = init_attempt, + "Middle-End pool initialized successfully" + ); + + // Phase 4: Start health monitor + let pool_clone = pool.clone(); + let rng_clone = rng.clone(); + let min_conns = pool_size; + tokio::spawn(async move { + crate::transport::middle_proxy::me_health_monitor( + pool_clone, rng_clone, min_conns, + ) + .await; + }); + + break Some(pool); + } + Err(e) => { + startup_tracker.set_me_last_error(Some(e.to_string())).await; + let retries_limited = me2dc_fallback && me_init_retry_attempts > 0; + if retries_limited && init_attempt >= me_init_retry_attempts { + startup_tracker + .fail_component( + COMPONENT_ME_POOL_INIT_STAGE1, + Some("ME init retry budget exhausted".to_string()), + ) + .await; + startup_tracker + .set_me_status(StartupMeStatus::Failed, "failed") + .await; + error!( + error = %e, + attempt = init_attempt, + retry_limit = me_init_retry_attempts, + "ME pool init retries exhausted; falling back to direct mode" + ); + break None; + } + + let retry_limit = if !me2dc_fallback || me_init_retry_attempts == 0 { + String::from("unlimited") + } else { + me_init_retry_attempts.to_string() + }; + if init_attempt >= me_init_warn_after_attempts { + warn!( + error = %e, + attempt = init_attempt, + retry_limit = retry_limit, + me2dc_fallback = me2dc_fallback, + retry_in_secs = 2, + "ME pool is not ready yet; retrying startup initialization" + ); + } else { + info!( + error = %e, + attempt = init_attempt, + retry_limit = retry_limit, + me2dc_fallback = me2dc_fallback, + retry_in_secs = 2, + "ME pool startup warmup: retrying initialization" + ); + } + pool.reset_stun_state(); + tokio::time::sleep(Duration::from_secs(2)).await; + } + } + } + } else { + startup_tracker + .skip_component( + COMPONENT_ME_POOL_CONSTRUCT, + Some("ME configs are incomplete".to_string()), + ) + .await; + startup_tracker + .fail_component( + COMPONENT_ME_POOL_INIT_STAGE1, + Some("ME configs are incomplete".to_string()), + ) + .await; + startup_tracker + .set_me_status(StartupMeStatus::Failed, "failed") + .await; + None + } + } + None => { + startup_tracker + .fail_component( + COMPONENT_ME_SECRET_FETCH, + Some("proxy-secret unavailable".to_string()), + ) + .await; + startup_tracker + .skip_component( + COMPONENT_ME_PROXY_CONFIG_V4, + Some("proxy-secret unavailable".to_string()), + ) + .await; + startup_tracker + .skip_component( + COMPONENT_ME_PROXY_CONFIG_V6, + Some("proxy-secret unavailable".to_string()), + ) + .await; + startup_tracker + .skip_component( + COMPONENT_ME_POOL_CONSTRUCT, + Some("proxy-secret unavailable".to_string()), + ) + .await; + startup_tracker + .fail_component( + COMPONENT_ME_POOL_INIT_STAGE1, + Some("proxy-secret unavailable".to_string()), + ) + .await; + startup_tracker + .set_me_status(StartupMeStatus::Failed, "failed") + .await; + None + } + } +} diff --git a/src/maestro/mod.rs b/src/maestro/mod.rs new file mode 100644 index 0000000..92b42e3 --- /dev/null +++ b/src/maestro/mod.rs @@ -0,0 +1,546 @@ +//! telemt — Telegram MTProto Proxy + +#![allow(unused_assignments)] + +// Runtime orchestration modules. +// - helpers: CLI and shared startup/runtime helper routines. +// - tls_bootstrap: TLS front cache bootstrap and refresh tasks. +// - me_startup: Middle-End secret/config fetch and pool initialization. +// - connectivity: startup ME/DC connectivity diagnostics. +// - runtime_tasks: hot-reload and background task orchestration. +// - admission: conditional-cast gate and route mode switching. +// - listeners: TCP/Unix listener bind and accept-loop orchestration. +// - shutdown: graceful shutdown sequence and uptime logging. +mod helpers; +mod admission; +mod connectivity; +mod listeners; +mod me_startup; +mod runtime_tasks; +mod shutdown; +mod tls_bootstrap; + +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +use tokio::sync::{RwLock, Semaphore, watch}; +use tracing::{error, info, warn}; +use tracing_subscriber::{EnvFilter, fmt, prelude::*, reload}; + +use crate::api; +use crate::config::{LogLevel, ProxyConfig}; +use crate::crypto::SecureRandom; +use crate::ip_tracker::UserIpTracker; +use crate::network::probe::{decide_network_capabilities, log_probe_result, run_probe}; +use crate::proxy::route_mode::{RelayRouteMode, RouteRuntimeController}; +use crate::stats::beobachten::BeobachtenStore; +use crate::stats::telemetry::TelemetryPolicy; +use crate::stats::{ReplayChecker, Stats}; +use crate::startup::{ + COMPONENT_API_BOOTSTRAP, COMPONENT_CONFIG_LOAD, + COMPONENT_ME_POOL_CONSTRUCT, COMPONENT_ME_POOL_INIT_STAGE1, + COMPONENT_ME_PROXY_CONFIG_V4, COMPONENT_ME_PROXY_CONFIG_V6, COMPONENT_ME_SECRET_FETCH, + COMPONENT_NETWORK_PROBE, COMPONENT_TRACING_INIT, StartupMeStatus, StartupTracker, +}; +use crate::stream::BufferPool; +use crate::transport::middle_proxy::MePool; +use crate::transport::UpstreamManager; +use helpers::parse_cli; + +/// Runs the full telemt runtime startup pipeline and blocks until shutdown. +pub async fn run() -> std::result::Result<(), Box> { + let process_started_at = Instant::now(); + let process_started_at_epoch_secs = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + let startup_tracker = Arc::new(StartupTracker::new(process_started_at_epoch_secs)); + startup_tracker + .start_component(COMPONENT_CONFIG_LOAD, Some("load and validate config".to_string())) + .await; + let (config_path, cli_silent, cli_log_level) = parse_cli(); + + let mut config = match ProxyConfig::load(&config_path) { + Ok(c) => c, + Err(e) => { + if std::path::Path::new(&config_path).exists() { + eprintln!("[telemt] Error: {}", e); + std::process::exit(1); + } else { + let default = ProxyConfig::default(); + std::fs::write(&config_path, toml::to_string_pretty(&default).unwrap()).unwrap(); + eprintln!("[telemt] Created default config at {}", config_path); + default + } + } + }; + + if let Err(e) = config.validate() { + eprintln!("[telemt] Invalid config: {}", e); + std::process::exit(1); + } + + if let Err(e) = crate::network::dns_overrides::install_entries(&config.network.dns_overrides) { + eprintln!("[telemt] Invalid network.dns_overrides: {}", e); + std::process::exit(1); + } + startup_tracker + .complete_component(COMPONENT_CONFIG_LOAD, Some("config is ready".to_string())) + .await; + + let has_rust_log = std::env::var("RUST_LOG").is_ok(); + let effective_log_level = if cli_silent { + LogLevel::Silent + } else if let Some(ref s) = cli_log_level { + LogLevel::from_str_loose(s) + } else { + config.general.log_level.clone() + }; + + let (filter_layer, filter_handle) = reload::Layer::new(EnvFilter::new("info")); + startup_tracker + .start_component(COMPONENT_TRACING_INIT, Some("initialize tracing subscriber".to_string())) + .await; + + // Configure color output based on config + let fmt_layer = if config.general.disable_colors { + fmt::Layer::default().with_ansi(false) + } else { + fmt::Layer::default().with_ansi(true) + }; + + tracing_subscriber::registry() + .with(filter_layer) + .with(fmt_layer) + .init(); + startup_tracker + .complete_component(COMPONENT_TRACING_INIT, Some("tracing initialized".to_string())) + .await; + + info!("Telemt MTProxy v{}", env!("CARGO_PKG_VERSION")); + info!("Log level: {}", effective_log_level); + if config.general.disable_colors { + info!("Colors: disabled"); + } + info!( + "Modes: classic={} secure={} tls={}", + config.general.modes.classic, config.general.modes.secure, config.general.modes.tls + ); + if config.general.modes.classic { + warn!("Classic mode is vulnerable to DPI detection; enable only for legacy clients"); + } + info!("TLS domain: {}", config.censorship.tls_domain); + if let Some(ref sock) = config.censorship.mask_unix_sock { + info!("Mask: {} -> unix:{}", config.censorship.mask, sock); + if !std::path::Path::new(sock).exists() { + warn!( + "Unix socket '{}' does not exist yet. Masking will fail until it appears.", + sock + ); + } + } else { + info!( + "Mask: {} -> {}:{}", + config.censorship.mask, + config + .censorship + .mask_host + .as_deref() + .unwrap_or(&config.censorship.tls_domain), + config.censorship.mask_port + ); + } + + if config.censorship.tls_domain == "www.google.com" { + warn!("Using default tls_domain. Consider setting a custom domain."); + } + + let stats = Arc::new(Stats::new()); + stats.apply_telemetry_policy(TelemetryPolicy::from_config(&config.general.telemetry)); + + let upstream_manager = Arc::new(UpstreamManager::new( + config.upstreams.clone(), + config.general.upstream_connect_retry_attempts, + config.general.upstream_connect_retry_backoff_ms, + config.general.upstream_connect_budget_ms, + config.general.upstream_unhealthy_fail_threshold, + config.general.upstream_connect_failfast_hard_errors, + stats.clone(), + )); + let ip_tracker = Arc::new(UserIpTracker::new()); + ip_tracker.load_limits(&config.access.user_max_unique_ips).await; + ip_tracker + .set_limit_policy( + config.access.user_max_unique_ips_mode, + config.access.user_max_unique_ips_window_secs, + ) + .await; + if !config.access.user_max_unique_ips.is_empty() { + info!( + "IP limits configured for {} users", + config.access.user_max_unique_ips.len() + ); + } + if !config.network.dns_overrides.is_empty() { + info!( + "Runtime DNS overrides configured: {} entries", + config.network.dns_overrides.len() + ); + } + + let (api_config_tx, api_config_rx) = watch::channel(Arc::new(config.clone())); + let initial_admission_open = !config.general.use_middle_proxy; + let (admission_tx, admission_rx) = watch::channel(initial_admission_open); + let initial_route_mode = if config.general.use_middle_proxy { + RelayRouteMode::Middle + } else { + RelayRouteMode::Direct + }; + let route_runtime = Arc::new(RouteRuntimeController::new(initial_route_mode)); + let api_me_pool = Arc::new(RwLock::new(None::>)); + startup_tracker + .start_component(COMPONENT_API_BOOTSTRAP, Some("spawn API listener task".to_string())) + .await; + + if config.server.api.enabled { + let listen = match config.server.api.listen.parse::() { + Ok(listen) => listen, + Err(error) => { + warn!( + error = %error, + listen = %config.server.api.listen, + "Invalid server.api.listen; API is disabled" + ); + SocketAddr::from(([127, 0, 0, 1], 0)) + } + }; + if listen.port() != 0 { + let stats_api = stats.clone(); + let ip_tracker_api = ip_tracker.clone(); + let me_pool_api = api_me_pool.clone(); + let upstream_manager_api = upstream_manager.clone(); + let config_rx_api = api_config_rx.clone(); + let admission_rx_api = admission_rx.clone(); + let config_path_api = std::path::PathBuf::from(&config_path); + let startup_tracker_api = startup_tracker.clone(); + tokio::spawn(async move { + api::serve( + listen, + stats_api, + ip_tracker_api, + me_pool_api, + upstream_manager_api, + config_rx_api, + admission_rx_api, + config_path_api, + None, + None, + process_started_at_epoch_secs, + startup_tracker_api, + ) + .await; + }); + startup_tracker + .complete_component( + COMPONENT_API_BOOTSTRAP, + Some(format!("api task spawned on {}", listen)), + ) + .await; + } else { + startup_tracker + .skip_component( + COMPONENT_API_BOOTSTRAP, + Some("server.api.listen has zero port".to_string()), + ) + .await; + } + } else { + startup_tracker + .skip_component( + COMPONENT_API_BOOTSTRAP, + Some("server.api.enabled is false".to_string()), + ) + .await; + } + + let mut tls_domains = Vec::with_capacity(1 + config.censorship.tls_domains.len()); + tls_domains.push(config.censorship.tls_domain.clone()); + for d in &config.censorship.tls_domains { + if !tls_domains.contains(d) { + tls_domains.push(d.clone()); + } + } + + let tls_cache = tls_bootstrap::bootstrap_tls_front( + &config, + &tls_domains, + upstream_manager.clone(), + &startup_tracker, + ) + .await; + + startup_tracker + .start_component(COMPONENT_NETWORK_PROBE, Some("probe network capabilities".to_string())) + .await; + let probe = run_probe( + &config.network, + config.general.middle_proxy_nat_probe, + config.general.stun_nat_probe_concurrency, + ) + .await?; + let decision = decide_network_capabilities(&config.network, &probe); + log_probe_result(&probe, &decision); + startup_tracker + .complete_component( + COMPONENT_NETWORK_PROBE, + Some("network capabilities determined".to_string()), + ) + .await; + + let prefer_ipv6 = decision.prefer_ipv6(); + let mut use_middle_proxy = config.general.use_middle_proxy; + let beobachten = Arc::new(BeobachtenStore::new()); + let rng = Arc::new(SecureRandom::new()); + + // Connection concurrency limit + let max_connections = Arc::new(Semaphore::new(10_000)); + + let me2dc_fallback = config.general.me2dc_fallback; + let me_init_retry_attempts = config.general.me_init_retry_attempts; + if use_middle_proxy && !decision.ipv4_me && !decision.ipv6_me { + if me2dc_fallback { + warn!("No usable IP family for Middle Proxy detected; falling back to direct DC"); + use_middle_proxy = false; + } else { + warn!( + "No usable IP family for Middle Proxy detected; me2dc_fallback=false, ME init retries stay active" + ); + } + } + + if use_middle_proxy { + startup_tracker + .set_me_status(StartupMeStatus::Initializing, COMPONENT_ME_SECRET_FETCH) + .await; + startup_tracker + .start_component( + COMPONENT_ME_SECRET_FETCH, + Some("fetch proxy-secret from source/cache".to_string()), + ) + .await; + startup_tracker + .set_me_retry_limit(if !me2dc_fallback || me_init_retry_attempts == 0 { + "unlimited".to_string() + } else { + me_init_retry_attempts.to_string() + }) + .await; + } else { + startup_tracker + .set_me_status(StartupMeStatus::Skipped, "skipped") + .await; + startup_tracker + .skip_component( + COMPONENT_ME_SECRET_FETCH, + Some("middle proxy mode disabled".to_string()), + ) + .await; + startup_tracker + .skip_component( + COMPONENT_ME_PROXY_CONFIG_V4, + Some("middle proxy mode disabled".to_string()), + ) + .await; + startup_tracker + .skip_component( + COMPONENT_ME_PROXY_CONFIG_V6, + Some("middle proxy mode disabled".to_string()), + ) + .await; + startup_tracker + .skip_component( + COMPONENT_ME_POOL_CONSTRUCT, + Some("middle proxy mode disabled".to_string()), + ) + .await; + startup_tracker + .skip_component( + COMPONENT_ME_POOL_INIT_STAGE1, + Some("middle proxy mode disabled".to_string()), + ) + .await; + } + + let me_pool: Option> = me_startup::initialize_me_pool( + use_middle_proxy, + &config, + &decision, + &probe, + &startup_tracker, + upstream_manager.clone(), + rng.clone(), + stats.clone(), + api_me_pool.clone(), + ) + .await; + + // If ME failed to initialize, force direct-only mode. + if me_pool.is_some() { + startup_tracker + .set_transport_mode("middle_proxy") + .await; + startup_tracker + .set_degraded(false) + .await; + info!("Transport: Middle-End Proxy - all DC-over-RPC"); + } else { + let _ = use_middle_proxy; + use_middle_proxy = false; + // Make runtime config reflect direct-only mode for handlers. + config.general.use_middle_proxy = false; + startup_tracker + .set_transport_mode("direct") + .await; + startup_tracker + .set_degraded(true) + .await; + if me2dc_fallback { + startup_tracker + .set_me_status(StartupMeStatus::Failed, "fallback_to_direct") + .await; + } else { + startup_tracker + .set_me_status(StartupMeStatus::Skipped, "skipped") + .await; + } + info!("Transport: Direct DC - TCP - standard DC-over-TCP"); + } + + // Freeze config after possible fallback decision + let config = Arc::new(config); + + let replay_checker = Arc::new(ReplayChecker::new( + config.access.replay_check_len, + Duration::from_secs(config.access.replay_window_secs), + )); + + let buffer_pool = Arc::new(BufferPool::with_config(16 * 1024, 4096)); + + connectivity::run_startup_connectivity( + &config, + &me_pool, + rng.clone(), + &startup_tracker, + upstream_manager.clone(), + prefer_ipv6, + &decision, + process_started_at, + api_me_pool.clone(), + ) + .await; + + let runtime_watches = runtime_tasks::spawn_runtime_tasks( + &config, + &config_path, + &probe, + prefer_ipv6, + decision.ipv4_dc, + decision.ipv6_dc, + &startup_tracker, + stats.clone(), + upstream_manager.clone(), + replay_checker.clone(), + me_pool.clone(), + rng.clone(), + ip_tracker.clone(), + beobachten.clone(), + api_config_tx.clone(), + me_pool.clone(), + ) + .await; + let config_rx = runtime_watches.config_rx; + let log_level_rx = runtime_watches.log_level_rx; + let detected_ip_v4 = runtime_watches.detected_ip_v4; + let detected_ip_v6 = runtime_watches.detected_ip_v6; + + admission::configure_admission_gate( + &config, + me_pool.clone(), + route_runtime.clone(), + &admission_tx, + config_rx.clone(), + ) + .await; + let _admission_tx_hold = admission_tx; + + let bound = listeners::bind_listeners( + &config, + decision.ipv4_dc, + decision.ipv6_dc, + detected_ip_v4, + detected_ip_v6, + &startup_tracker, + config_rx.clone(), + admission_rx.clone(), + stats.clone(), + upstream_manager.clone(), + replay_checker.clone(), + buffer_pool.clone(), + rng.clone(), + me_pool.clone(), + route_runtime.clone(), + tls_cache.clone(), + ip_tracker.clone(), + beobachten.clone(), + max_connections.clone(), + ) + .await?; + let listeners = bound.listeners; + let has_unix_listener = bound.has_unix_listener; + + if listeners.is_empty() && !has_unix_listener { + error!("No listeners. Exiting."); + std::process::exit(1); + } + + runtime_tasks::apply_runtime_log_filter( + has_rust_log, + &effective_log_level, + filter_handle, + log_level_rx, + ) + .await; + + runtime_tasks::spawn_metrics_if_configured( + &config, + &startup_tracker, + stats.clone(), + beobachten.clone(), + ip_tracker.clone(), + config_rx.clone(), + ) + .await; + + runtime_tasks::mark_runtime_ready(&startup_tracker).await; + + listeners::spawn_tcp_accept_loops( + listeners, + config_rx.clone(), + admission_rx.clone(), + stats.clone(), + upstream_manager.clone(), + replay_checker.clone(), + buffer_pool.clone(), + rng.clone(), + me_pool.clone(), + route_runtime.clone(), + tls_cache.clone(), + ip_tracker.clone(), + beobachten.clone(), + max_connections.clone(), + ); + + shutdown::wait_for_shutdown(process_started_at, me_pool).await; + + Ok(()) +} diff --git a/src/maestro/runtime_tasks.rs b/src/maestro/runtime_tasks.rs new file mode 100644 index 0000000..c8aa534 --- /dev/null +++ b/src/maestro/runtime_tasks.rs @@ -0,0 +1,317 @@ +use std::net::IpAddr; +use std::path::PathBuf; +use std::sync::Arc; + +use tokio::sync::{mpsc, watch}; +use tracing::{debug, warn}; +use tracing_subscriber::reload; +use tracing_subscriber::EnvFilter; + +use crate::config::{LogLevel, ProxyConfig}; +use crate::config::hot_reload::spawn_config_watcher; +use crate::crypto::SecureRandom; +use crate::ip_tracker::UserIpTracker; +use crate::metrics; +use crate::network::probe::NetworkProbe; +use crate::startup::{COMPONENT_CONFIG_WATCHER_START, COMPONENT_METRICS_START, COMPONENT_RUNTIME_READY, StartupTracker}; +use crate::stats::beobachten::BeobachtenStore; +use crate::stats::telemetry::TelemetryPolicy; +use crate::stats::{ReplayChecker, Stats}; +use crate::transport::middle_proxy::{MePool, MeReinitTrigger}; +use crate::transport::UpstreamManager; + +use super::helpers::write_beobachten_snapshot; + +pub(crate) struct RuntimeWatches { + pub(crate) config_rx: watch::Receiver>, + pub(crate) log_level_rx: watch::Receiver, + pub(crate) detected_ip_v4: Option, + pub(crate) detected_ip_v6: Option, +} + +#[allow(clippy::too_many_arguments)] +pub(crate) async fn spawn_runtime_tasks( + config: &Arc, + config_path: &str, + probe: &NetworkProbe, + prefer_ipv6: bool, + decision_ipv4_dc: bool, + decision_ipv6_dc: bool, + startup_tracker: &Arc, + stats: Arc, + upstream_manager: Arc, + replay_checker: Arc, + me_pool: Option>, + rng: Arc, + ip_tracker: Arc, + beobachten: Arc, + api_config_tx: watch::Sender>, + me_pool_for_policy: Option>, +) -> RuntimeWatches { + let um_clone = upstream_manager.clone(); + let dc_overrides_for_health = config.dc_overrides.clone(); + tokio::spawn(async move { + um_clone + .run_health_checks( + prefer_ipv6, + decision_ipv4_dc, + decision_ipv6_dc, + dc_overrides_for_health, + ) + .await; + }); + + let rc_clone = replay_checker.clone(); + tokio::spawn(async move { + rc_clone.run_periodic_cleanup().await; + }); + + let detected_ip_v4: Option = probe.detected_ipv4.map(IpAddr::V4); + let detected_ip_v6: Option = probe.detected_ipv6.map(IpAddr::V6); + debug!( + "Detected IPs: v4={:?} v6={:?}", + detected_ip_v4, detected_ip_v6 + ); + + startup_tracker + .start_component( + COMPONENT_CONFIG_WATCHER_START, + Some("spawn config hot-reload watcher".to_string()), + ) + .await; + let (config_rx, log_level_rx): ( + watch::Receiver>, + watch::Receiver, + ) = spawn_config_watcher( + PathBuf::from(config_path), + config.clone(), + detected_ip_v4, + detected_ip_v6, + ); + startup_tracker + .complete_component( + COMPONENT_CONFIG_WATCHER_START, + Some("config hot-reload watcher started".to_string()), + ) + .await; + let mut config_rx_api_bridge = config_rx.clone(); + let api_config_tx_bridge = api_config_tx.clone(); + tokio::spawn(async move { + loop { + if config_rx_api_bridge.changed().await.is_err() { + break; + } + let cfg = config_rx_api_bridge.borrow_and_update().clone(); + api_config_tx_bridge.send_replace(cfg); + } + }); + + let stats_policy = stats.clone(); + let mut config_rx_policy = config_rx.clone(); + tokio::spawn(async move { + loop { + if config_rx_policy.changed().await.is_err() { + break; + } + let cfg = config_rx_policy.borrow_and_update().clone(); + stats_policy.apply_telemetry_policy(TelemetryPolicy::from_config(&cfg.general.telemetry)); + if let Some(pool) = &me_pool_for_policy { + pool.update_runtime_transport_policy( + cfg.general.me_socks_kdf_policy, + cfg.general.me_route_backpressure_base_timeout_ms, + cfg.general.me_route_backpressure_high_timeout_ms, + cfg.general.me_route_backpressure_high_watermark_pct, + cfg.general.me_reader_route_data_wait_ms, + ); + } + } + }); + + let ip_tracker_policy = ip_tracker.clone(); + let mut config_rx_ip_limits = config_rx.clone(); + tokio::spawn(async move { + let mut prev_limits = config_rx_ip_limits.borrow().access.user_max_unique_ips.clone(); + let mut prev_mode = config_rx_ip_limits.borrow().access.user_max_unique_ips_mode; + let mut prev_window = config_rx_ip_limits + .borrow() + .access + .user_max_unique_ips_window_secs; + + loop { + if config_rx_ip_limits.changed().await.is_err() { + break; + } + let cfg = config_rx_ip_limits.borrow_and_update().clone(); + + if prev_limits != cfg.access.user_max_unique_ips { + ip_tracker_policy.load_limits(&cfg.access.user_max_unique_ips).await; + prev_limits = cfg.access.user_max_unique_ips.clone(); + } + + if prev_mode != cfg.access.user_max_unique_ips_mode + || prev_window != cfg.access.user_max_unique_ips_window_secs + { + ip_tracker_policy + .set_limit_policy( + cfg.access.user_max_unique_ips_mode, + cfg.access.user_max_unique_ips_window_secs, + ) + .await; + prev_mode = cfg.access.user_max_unique_ips_mode; + prev_window = cfg.access.user_max_unique_ips_window_secs; + } + } + }); + + let beobachten_writer = beobachten.clone(); + let config_rx_beobachten = config_rx.clone(); + tokio::spawn(async move { + loop { + let cfg = config_rx_beobachten.borrow().clone(); + let sleep_secs = cfg.general.beobachten_flush_secs.max(1); + + if cfg.general.beobachten { + let ttl = std::time::Duration::from_secs(cfg.general.beobachten_minutes.saturating_mul(60)); + let path = cfg.general.beobachten_file.clone(); + let snapshot = beobachten_writer.snapshot_text(ttl); + if let Err(e) = write_beobachten_snapshot(&path, &snapshot).await { + warn!(error = %e, path = %path, "Failed to flush beobachten snapshot"); + } + } + + tokio::time::sleep(std::time::Duration::from_secs(sleep_secs)).await; + } + }); + + if let Some(pool) = me_pool { + let reinit_trigger_capacity = config.general.me_reinit_trigger_channel.max(1); + let (reinit_tx, reinit_rx) = mpsc::channel::(reinit_trigger_capacity); + + let pool_clone_sched = pool.clone(); + let rng_clone_sched = rng.clone(); + let config_rx_clone_sched = config_rx.clone(); + tokio::spawn(async move { + crate::transport::middle_proxy::me_reinit_scheduler( + pool_clone_sched, + rng_clone_sched, + config_rx_clone_sched, + reinit_rx, + ) + .await; + }); + + let pool_clone = pool.clone(); + let config_rx_clone = config_rx.clone(); + let reinit_tx_updater = reinit_tx.clone(); + tokio::spawn(async move { + crate::transport::middle_proxy::me_config_updater( + pool_clone, + config_rx_clone, + reinit_tx_updater, + ) + .await; + }); + + let config_rx_clone_rot = config_rx.clone(); + let reinit_tx_rotation = reinit_tx.clone(); + tokio::spawn(async move { + crate::transport::middle_proxy::me_rotation_task(config_rx_clone_rot, reinit_tx_rotation) + .await; + }); + } + + RuntimeWatches { + config_rx, + log_level_rx, + detected_ip_v4, + detected_ip_v6, + } +} + +pub(crate) async fn apply_runtime_log_filter( + has_rust_log: bool, + effective_log_level: &LogLevel, + filter_handle: reload::Handle, + mut log_level_rx: watch::Receiver, +) { + let runtime_filter = if has_rust_log { + EnvFilter::from_default_env() + } else if matches!(effective_log_level, LogLevel::Silent) { + EnvFilter::new("warn,telemt::links=info") + } else { + EnvFilter::new(effective_log_level.to_filter_str()) + }; + filter_handle + .reload(runtime_filter) + .expect("Failed to switch log filter"); + + tokio::spawn(async move { + loop { + if log_level_rx.changed().await.is_err() { + break; + } + let level = log_level_rx.borrow_and_update().clone(); + let new_filter = tracing_subscriber::EnvFilter::new(level.to_filter_str()); + if let Err(e) = filter_handle.reload(new_filter) { + tracing::error!("config reload: failed to update log filter: {}", e); + } + } + }); +} + +pub(crate) async fn spawn_metrics_if_configured( + config: &Arc, + startup_tracker: &Arc, + stats: Arc, + beobachten: Arc, + ip_tracker: Arc, + config_rx: watch::Receiver>, +) { + if let Some(port) = config.server.metrics_port { + startup_tracker + .start_component( + COMPONENT_METRICS_START, + Some(format!("spawn metrics endpoint on {}", port)), + ) + .await; + let stats = stats.clone(); + let beobachten = beobachten.clone(); + let config_rx_metrics = config_rx.clone(); + let ip_tracker_metrics = ip_tracker.clone(); + let whitelist = config.server.metrics_whitelist.clone(); + tokio::spawn(async move { + metrics::serve( + port, + stats, + beobachten, + ip_tracker_metrics, + config_rx_metrics, + whitelist, + ) + .await; + }); + startup_tracker + .complete_component( + COMPONENT_METRICS_START, + Some("metrics task spawned".to_string()), + ) + .await; + } else { + startup_tracker + .skip_component( + COMPONENT_METRICS_START, + Some("server.metrics_port is not configured".to_string()), + ) + .await; + } +} + +pub(crate) async fn mark_runtime_ready(startup_tracker: &Arc) { + startup_tracker + .complete_component( + COMPONENT_RUNTIME_READY, + Some("startup pipeline is fully initialized".to_string()), + ) + .await; + startup_tracker.mark_ready().await; +} diff --git a/src/maestro/shutdown.rs b/src/maestro/shutdown.rs new file mode 100644 index 0000000..b73df30 --- /dev/null +++ b/src/maestro/shutdown.rs @@ -0,0 +1,42 @@ +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use tokio::signal; +use tracing::{error, info, warn}; + +use crate::transport::middle_proxy::MePool; + +use super::helpers::{format_uptime, unit_label}; + +pub(crate) async fn wait_for_shutdown(process_started_at: Instant, me_pool: Option>) { + match signal::ctrl_c().await { + Ok(()) => { + let shutdown_started_at = Instant::now(); + info!("Shutting down..."); + let uptime_secs = process_started_at.elapsed().as_secs(); + info!("Uptime: {}", format_uptime(uptime_secs)); + if let Some(pool) = &me_pool { + match tokio::time::timeout(Duration::from_secs(2), pool.shutdown_send_close_conn_all()) + .await + { + Ok(total) => { + info!( + close_conn_sent = total, + "ME shutdown: RPC_CLOSE_CONN broadcast completed" + ); + } + Err(_) => { + warn!("ME shutdown: RPC_CLOSE_CONN broadcast timed out"); + } + } + } + let shutdown_secs = shutdown_started_at.elapsed().as_secs(); + info!( + "Shutdown completed successfully in {} {}.", + shutdown_secs, + unit_label(shutdown_secs, "second", "seconds") + ); + } + Err(e) => error!("Signal error: {}", e), + } +} diff --git a/src/maestro/tls_bootstrap.rs b/src/maestro/tls_bootstrap.rs new file mode 100644 index 0000000..a0b0b5a --- /dev/null +++ b/src/maestro/tls_bootstrap.rs @@ -0,0 +1,165 @@ +use std::sync::Arc; +use std::time::Duration; + +use rand::Rng; +use tracing::warn; + +use crate::config::ProxyConfig; +use crate::startup::{COMPONENT_TLS_FRONT_BOOTSTRAP, StartupTracker}; +use crate::tls_front::TlsFrontCache; +use crate::transport::UpstreamManager; + +pub(crate) async fn bootstrap_tls_front( + config: &ProxyConfig, + tls_domains: &[String], + upstream_manager: Arc, + startup_tracker: &Arc, +) -> Option> { + startup_tracker + .start_component( + COMPONENT_TLS_FRONT_BOOTSTRAP, + Some("initialize TLS front cache/bootstrap tasks".to_string()), + ) + .await; + + let tls_cache: Option> = if config.censorship.tls_emulation { + let cache = Arc::new(TlsFrontCache::new( + tls_domains, + config.censorship.fake_cert_len, + &config.censorship.tls_front_dir, + )); + cache.load_from_disk().await; + + let port = config.censorship.mask_port; + let proxy_protocol = config.censorship.mask_proxy_protocol; + let mask_host = config + .censorship + .mask_host + .clone() + .unwrap_or_else(|| config.censorship.tls_domain.clone()); + let mask_unix_sock = config.censorship.mask_unix_sock.clone(); + let fetch_timeout = Duration::from_secs(5); + + let cache_initial = cache.clone(); + let domains_initial = tls_domains.to_vec(); + let host_initial = mask_host.clone(); + let unix_sock_initial = mask_unix_sock.clone(); + let upstream_initial = upstream_manager.clone(); + tokio::spawn(async move { + let mut join = tokio::task::JoinSet::new(); + for domain in domains_initial { + let cache_domain = cache_initial.clone(); + let host_domain = host_initial.clone(); + let unix_sock_domain = unix_sock_initial.clone(); + let upstream_domain = upstream_initial.clone(); + join.spawn(async move { + match crate::tls_front::fetcher::fetch_real_tls( + &host_domain, + port, + &domain, + fetch_timeout, + Some(upstream_domain), + proxy_protocol, + unix_sock_domain.as_deref(), + ) + .await + { + Ok(res) => cache_domain.update_from_fetch(&domain, res).await, + Err(e) => { + warn!(domain = %domain, error = %e, "TLS emulation initial fetch failed") + } + } + }); + } + while let Some(res) = join.join_next().await { + if let Err(e) = res { + warn!(error = %e, "TLS emulation initial fetch task join failed"); + } + } + }); + + let cache_timeout = cache.clone(); + let domains_timeout = tls_domains.to_vec(); + let fake_cert_len = config.censorship.fake_cert_len; + tokio::spawn(async move { + tokio::time::sleep(fetch_timeout).await; + for domain in domains_timeout { + let cached = cache_timeout.get(&domain).await; + if cached.domain == "default" { + warn!( + domain = %domain, + timeout_secs = fetch_timeout.as_secs(), + fake_cert_len, + "TLS-front fetch not ready within timeout; using cache/default fake cert fallback" + ); + } + } + }); + + let cache_refresh = cache.clone(); + let domains_refresh = tls_domains.to_vec(); + let host_refresh = mask_host.clone(); + let unix_sock_refresh = mask_unix_sock.clone(); + let upstream_refresh = upstream_manager.clone(); + tokio::spawn(async move { + loop { + let base_secs = rand::rng().random_range(4 * 3600..=6 * 3600); + let jitter_secs = rand::rng().random_range(0..=7200); + tokio::time::sleep(Duration::from_secs(base_secs + jitter_secs)).await; + + let mut join = tokio::task::JoinSet::new(); + for domain in domains_refresh.clone() { + let cache_domain = cache_refresh.clone(); + let host_domain = host_refresh.clone(); + let unix_sock_domain = unix_sock_refresh.clone(); + let upstream_domain = upstream_refresh.clone(); + join.spawn(async move { + match crate::tls_front::fetcher::fetch_real_tls( + &host_domain, + port, + &domain, + fetch_timeout, + Some(upstream_domain), + proxy_protocol, + unix_sock_domain.as_deref(), + ) + .await + { + Ok(res) => cache_domain.update_from_fetch(&domain, res).await, + Err(e) => { + warn!(domain = %domain, error = %e, "TLS emulation refresh failed") + } + } + }); + } + + while let Some(res) = join.join_next().await { + if let Err(e) = res { + warn!(error = %e, "TLS emulation refresh task join failed"); + } + } + } + }); + + Some(cache) + } else { + startup_tracker + .skip_component( + COMPONENT_TLS_FRONT_BOOTSTRAP, + Some("censorship.tls_emulation is false".to_string()), + ) + .await; + None + }; + + if tls_cache.is_some() { + startup_tracker + .complete_component( + COMPONENT_TLS_FRONT_BOOTSTRAP, + Some("tls front cache is initialized".to_string()), + ) + .await; + } + + tls_cache +} diff --git a/src/main.rs b/src/main.rs index b8b5d60..73ada8c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,2270 +1,24 @@ //! telemt — Telegram MTProto Proxy -#![allow(unused_assignments)] - -use std::net::SocketAddr; -use std::sync::Arc; -use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; -use rand::Rng; -use tokio::net::TcpListener; -use tokio::signal; -use tokio::sync::{RwLock, Semaphore, mpsc, watch}; -use tracing::{debug, error, info, warn}; -use tracing_subscriber::{EnvFilter, fmt, prelude::*, reload}; -#[cfg(unix)] -use tokio::net::UnixListener; - -mod cli; mod api; +mod cli; mod config; mod crypto; mod error; mod ip_tracker; -mod network; +mod maestro; mod metrics; +mod network; mod protocol; mod proxy; +mod startup; mod stats; mod stream; -mod startup; -mod transport; mod tls_front; +mod transport; mod util; -use crate::config::{LogLevel, ProxyConfig}; -use crate::config::hot_reload::spawn_config_watcher; -use crate::crypto::SecureRandom; -use crate::ip_tracker::UserIpTracker; -use crate::network::probe::{decide_network_capabilities, log_probe_result, run_probe}; -use crate::proxy::ClientHandler; -use crate::proxy::route_mode::{ROUTE_SWITCH_ERROR_MSG, RelayRouteMode, RouteRuntimeController}; -use crate::stats::beobachten::BeobachtenStore; -use crate::stats::telemetry::TelemetryPolicy; -use crate::stats::{ReplayChecker, Stats}; -use crate::startup::{ - COMPONENT_API_BOOTSTRAP, COMPONENT_CONFIG_LOAD, COMPONENT_CONFIG_WATCHER_START, - COMPONENT_DC_CONNECTIVITY_PING, COMPONENT_LISTENERS_BIND, COMPONENT_ME_CONNECTIVITY_PING, - COMPONENT_ME_POOL_CONSTRUCT, COMPONENT_ME_POOL_INIT_STAGE1, COMPONENT_ME_PROXY_CONFIG_V4, - COMPONENT_ME_PROXY_CONFIG_V6, COMPONENT_ME_SECRET_FETCH, COMPONENT_METRICS_START, - COMPONENT_NETWORK_PROBE, COMPONENT_RUNTIME_READY, COMPONENT_TLS_FRONT_BOOTSTRAP, - COMPONENT_TRACING_INIT, StartupMeStatus, StartupTracker, -}; -use crate::stream::BufferPool; -use crate::transport::middle_proxy::{ - MePool, ProxyConfigData, fetch_proxy_config_with_raw, format_me_route, format_sample_line, - load_proxy_config_cache, run_me_ping, save_proxy_config_cache, MePingFamily, MePingSample, - MeReinitTrigger, -}; -use crate::transport::{ListenOptions, UpstreamManager, create_listener, find_listener_processes}; -use crate::tls_front::TlsFrontCache; - -fn parse_cli() -> (String, bool, Option) { - let mut config_path = "config.toml".to_string(); - let mut silent = false; - let mut log_level: Option = None; - - let args: Vec = std::env::args().skip(1).collect(); - - // Check for --init first (handled before tokio) - if let Some(init_opts) = cli::parse_init_args(&args) { - if let Err(e) = cli::run_init(init_opts) { - eprintln!("[telemt] Init failed: {}", e); - std::process::exit(1); - } - std::process::exit(0); - } - - let mut i = 0; - while i < args.len() { - match args[i].as_str() { - "--silent" | "-s" => { - silent = true; - } - "--log-level" => { - i += 1; - if i < args.len() { - log_level = Some(args[i].clone()); - } - } - s if s.starts_with("--log-level=") => { - log_level = Some(s.trim_start_matches("--log-level=").to_string()); - } - "--help" | "-h" => { - eprintln!("Usage: telemt [config.toml] [OPTIONS]"); - eprintln!(); - eprintln!("Options:"); - eprintln!(" --silent, -s Suppress info logs"); - eprintln!(" --log-level debug|verbose|normal|silent"); - eprintln!(" --help, -h Show this help"); - eprintln!(); - eprintln!("Setup (fire-and-forget):"); - eprintln!( - " --init Generate config, install systemd service, start" - ); - eprintln!(" --port Listen port (default: 443)"); - eprintln!( - " --domain TLS domain for masking (default: www.google.com)" - ); - eprintln!( - " --secret 32-char hex secret (auto-generated if omitted)" - ); - eprintln!(" --user Username (default: user)"); - eprintln!(" --config-dir Config directory (default: /etc/telemt)"); - eprintln!(" --no-start Don't start the service after install"); - std::process::exit(0); - } - "--version" | "-V" => { - println!("telemt {}", env!("CARGO_PKG_VERSION")); - std::process::exit(0); - } - s if !s.starts_with('-') => { - config_path = s.to_string(); - } - other => { - eprintln!("Unknown option: {}", other); - } - } - i += 1; - } - - (config_path, silent, log_level) -} - -fn print_proxy_links(host: &str, port: u16, config: &ProxyConfig) { - info!(target: "telemt::links", "--- Proxy Links ({}) ---", host); - for user_name in config.general.links.show.resolve_users(&config.access.users) { - if let Some(secret) = config.access.users.get(user_name) { - info!(target: "telemt::links", "User: {}", user_name); - if config.general.modes.classic { - info!( - target: "telemt::links", - " Classic: tg://proxy?server={}&port={}&secret={}", - host, port, secret - ); - } - if config.general.modes.secure { - info!( - target: "telemt::links", - " DD: tg://proxy?server={}&port={}&secret=dd{}", - host, port, secret - ); - } - if config.general.modes.tls { - let mut domains = Vec::with_capacity(1 + config.censorship.tls_domains.len()); - domains.push(config.censorship.tls_domain.clone()); - for d in &config.censorship.tls_domains { - if !domains.contains(d) { - domains.push(d.clone()); - } - } - - for domain in domains { - let domain_hex = hex::encode(&domain); - info!( - target: "telemt::links", - " EE-TLS: tg://proxy?server={}&port={}&secret=ee{}{}", - host, port, secret, domain_hex - ); - } - } - } else { - warn!(target: "telemt::links", "User '{}' in show_link not found", user_name); - } - } - info!(target: "telemt::links", "------------------------"); -} - -async fn write_beobachten_snapshot(path: &str, payload: &str) -> std::io::Result<()> { - if let Some(parent) = std::path::Path::new(path).parent() - && !parent.as_os_str().is_empty() - { - tokio::fs::create_dir_all(parent).await?; - } - tokio::fs::write(path, payload).await -} - -fn unit_label(value: u64, singular: &'static str, plural: &'static str) -> &'static str { - if value == 1 { singular } else { plural } -} - -fn format_uptime(total_secs: u64) -> String { - const SECS_PER_MINUTE: u64 = 60; - const SECS_PER_HOUR: u64 = 60 * SECS_PER_MINUTE; - const SECS_PER_DAY: u64 = 24 * SECS_PER_HOUR; - const SECS_PER_MONTH: u64 = 30 * SECS_PER_DAY; - const SECS_PER_YEAR: u64 = 12 * SECS_PER_MONTH; - - let mut remaining = total_secs; - let years = remaining / SECS_PER_YEAR; - remaining %= SECS_PER_YEAR; - let months = remaining / SECS_PER_MONTH; - remaining %= SECS_PER_MONTH; - let days = remaining / SECS_PER_DAY; - remaining %= SECS_PER_DAY; - let hours = remaining / SECS_PER_HOUR; - remaining %= SECS_PER_HOUR; - let minutes = remaining / SECS_PER_MINUTE; - let seconds = remaining % SECS_PER_MINUTE; - - let mut parts = Vec::new(); - if total_secs > SECS_PER_YEAR { - parts.push(format!( - "{} {}", - years, - unit_label(years, "year", "years") - )); - } - if total_secs > SECS_PER_MONTH { - parts.push(format!( - "{} {}", - months, - unit_label(months, "month", "months") - )); - } - if total_secs > SECS_PER_DAY { - parts.push(format!( - "{} {}", - days, - unit_label(days, "day", "days") - )); - } - if total_secs > SECS_PER_HOUR { - parts.push(format!( - "{} {}", - hours, - unit_label(hours, "hour", "hours") - )); - } - if total_secs > SECS_PER_MINUTE { - parts.push(format!( - "{} {}", - minutes, - unit_label(minutes, "minute", "minutes") - )); - } - parts.push(format!( - "{} {}", - seconds, - unit_label(seconds, "second", "seconds") - )); - - format!("{} / {} seconds", parts.join(", "), total_secs) -} - -async fn wait_until_admission_open(admission_rx: &mut watch::Receiver) -> bool { - loop { - if *admission_rx.borrow() { - return true; - } - if admission_rx.changed().await.is_err() { - return *admission_rx.borrow(); - } - } -} - -fn is_expected_handshake_eof(err: &crate::error::ProxyError) -> bool { - err.to_string().contains("expected 64 bytes, got 0") -} - -async fn load_startup_proxy_config_snapshot( - url: &str, - cache_path: Option<&str>, - me2dc_fallback: bool, - label: &'static str, -) -> Option { - loop { - match fetch_proxy_config_with_raw(url).await { - Ok((cfg, raw)) => { - if !cfg.map.is_empty() { - if let Some(path) = cache_path - && let Err(e) = save_proxy_config_cache(path, &raw).await - { - warn!(error = %e, path, snapshot = label, "Failed to store startup proxy-config cache"); - } - return Some(cfg); - } - - warn!(snapshot = label, url, "Startup proxy-config is empty; trying disk cache"); - if let Some(path) = cache_path { - match load_proxy_config_cache(path).await { - Ok(cached) if !cached.map.is_empty() => { - info!( - snapshot = label, - path, - proxy_for_lines = cached.proxy_for_lines, - "Loaded startup proxy-config from disk cache" - ); - return Some(cached); - } - Ok(_) => { - warn!( - snapshot = label, - path, - "Startup proxy-config cache is empty; ignoring cache file" - ); - } - Err(cache_err) => { - debug!( - snapshot = label, - path, - error = %cache_err, - "Startup proxy-config cache unavailable" - ); - } - } - } - - if me2dc_fallback { - error!( - snapshot = label, - "Startup proxy-config unavailable and no saved config found; falling back to direct mode" - ); - return None; - } - - warn!( - snapshot = label, - retry_in_secs = 2, - "Startup proxy-config unavailable and no saved config found; retrying because me2dc_fallback=false" - ); - tokio::time::sleep(Duration::from_secs(2)).await; - } - Err(fetch_err) => { - if let Some(path) = cache_path { - match load_proxy_config_cache(path).await { - Ok(cached) if !cached.map.is_empty() => { - info!( - snapshot = label, - path, - proxy_for_lines = cached.proxy_for_lines, - "Loaded startup proxy-config from disk cache" - ); - return Some(cached); - } - Ok(_) => { - warn!( - snapshot = label, - path, - "Startup proxy-config cache is empty; ignoring cache file" - ); - } - Err(cache_err) => { - debug!( - snapshot = label, - path, - error = %cache_err, - "Startup proxy-config cache unavailable" - ); - } - } - } - - if me2dc_fallback { - error!( - snapshot = label, - error = %fetch_err, - "Startup proxy-config unavailable and no cached data; falling back to direct mode" - ); - return None; - } - - warn!( - snapshot = label, - error = %fetch_err, - retry_in_secs = 2, - "Startup proxy-config unavailable; retrying because me2dc_fallback=false" - ); - tokio::time::sleep(Duration::from_secs(2)).await; - } - } - } -} - #[tokio::main] async fn main() -> std::result::Result<(), Box> { - let process_started_at = Instant::now(); - let process_started_at_epoch_secs = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap_or_default() - .as_secs(); - let startup_tracker = Arc::new(StartupTracker::new(process_started_at_epoch_secs)); - startup_tracker - .start_component(COMPONENT_CONFIG_LOAD, Some("load and validate config".to_string())) - .await; - let (config_path, cli_silent, cli_log_level) = parse_cli(); - - let mut config = match ProxyConfig::load(&config_path) { - Ok(c) => c, - Err(e) => { - if std::path::Path::new(&config_path).exists() { - eprintln!("[telemt] Error: {}", e); - std::process::exit(1); - } else { - let default = ProxyConfig::default(); - std::fs::write(&config_path, toml::to_string_pretty(&default).unwrap()).unwrap(); - eprintln!("[telemt] Created default config at {}", config_path); - default - } - } - }; - - if let Err(e) = config.validate() { - eprintln!("[telemt] Invalid config: {}", e); - std::process::exit(1); - } - - if let Err(e) = crate::network::dns_overrides::install_entries(&config.network.dns_overrides) { - eprintln!("[telemt] Invalid network.dns_overrides: {}", e); - std::process::exit(1); - } - startup_tracker - .complete_component(COMPONENT_CONFIG_LOAD, Some("config is ready".to_string())) - .await; - - let has_rust_log = std::env::var("RUST_LOG").is_ok(); - let effective_log_level = if cli_silent { - LogLevel::Silent - } else if let Some(ref s) = cli_log_level { - LogLevel::from_str_loose(s) - } else { - config.general.log_level.clone() - }; - - let (filter_layer, filter_handle) = reload::Layer::new(EnvFilter::new("info")); - startup_tracker - .start_component(COMPONENT_TRACING_INIT, Some("initialize tracing subscriber".to_string())) - .await; - - // Configure color output based on config - let fmt_layer = if config.general.disable_colors { - fmt::Layer::default().with_ansi(false) - } else { - fmt::Layer::default().with_ansi(true) - }; - - tracing_subscriber::registry() - .with(filter_layer) - .with(fmt_layer) - .init(); - startup_tracker - .complete_component(COMPONENT_TRACING_INIT, Some("tracing initialized".to_string())) - .await; - - info!("Telemt MTProxy v{}", env!("CARGO_PKG_VERSION")); - info!("Log level: {}", effective_log_level); - if config.general.disable_colors { - info!("Colors: disabled"); - } - info!( - "Modes: classic={} secure={} tls={}", - config.general.modes.classic, config.general.modes.secure, config.general.modes.tls - ); - if config.general.modes.classic { - warn!("Classic mode is vulnerable to DPI detection; enable only for legacy clients"); - } - info!("TLS domain: {}", config.censorship.tls_domain); - if let Some(ref sock) = config.censorship.mask_unix_sock { - info!("Mask: {} -> unix:{}", config.censorship.mask, sock); - if !std::path::Path::new(sock).exists() { - warn!( - "Unix socket '{}' does not exist yet. Masking will fail until it appears.", - sock - ); - } - } else { - info!( - "Mask: {} -> {}:{}", - config.censorship.mask, - config - .censorship - .mask_host - .as_deref() - .unwrap_or(&config.censorship.tls_domain), - config.censorship.mask_port - ); - } - - if config.censorship.tls_domain == "www.google.com" { - warn!("Using default tls_domain. Consider setting a custom domain."); - } - - let stats = Arc::new(Stats::new()); - stats.apply_telemetry_policy(TelemetryPolicy::from_config(&config.general.telemetry)); - - let upstream_manager = Arc::new(UpstreamManager::new( - config.upstreams.clone(), - config.general.upstream_connect_retry_attempts, - config.general.upstream_connect_retry_backoff_ms, - config.general.upstream_connect_budget_ms, - config.general.upstream_unhealthy_fail_threshold, - config.general.upstream_connect_failfast_hard_errors, - stats.clone(), - )); - let ip_tracker = Arc::new(UserIpTracker::new()); - ip_tracker.load_limits(&config.access.user_max_unique_ips).await; - ip_tracker - .set_limit_policy( - config.access.user_max_unique_ips_mode, - config.access.user_max_unique_ips_window_secs, - ) - .await; - if !config.access.user_max_unique_ips.is_empty() { - info!( - "IP limits configured for {} users", - config.access.user_max_unique_ips.len() - ); - } - if !config.network.dns_overrides.is_empty() { - info!( - "Runtime DNS overrides configured: {} entries", - config.network.dns_overrides.len() - ); - } - - let (api_config_tx, api_config_rx) = watch::channel(Arc::new(config.clone())); - let initial_admission_open = !config.general.use_middle_proxy; - let (admission_tx, admission_rx) = watch::channel(initial_admission_open); - let initial_route_mode = if config.general.use_middle_proxy { - RelayRouteMode::Middle - } else { - RelayRouteMode::Direct - }; - let route_runtime = Arc::new(RouteRuntimeController::new(initial_route_mode)); - let api_me_pool = Arc::new(RwLock::new(None::>)); - startup_tracker - .start_component(COMPONENT_API_BOOTSTRAP, Some("spawn API listener task".to_string())) - .await; - - if config.server.api.enabled { - let listen = match config.server.api.listen.parse::() { - Ok(listen) => listen, - Err(error) => { - warn!( - error = %error, - listen = %config.server.api.listen, - "Invalid server.api.listen; API is disabled" - ); - SocketAddr::from(([127, 0, 0, 1], 0)) - } - }; - if listen.port() != 0 { - let stats_api = stats.clone(); - let ip_tracker_api = ip_tracker.clone(); - let me_pool_api = api_me_pool.clone(); - let upstream_manager_api = upstream_manager.clone(); - let config_rx_api = api_config_rx.clone(); - let admission_rx_api = admission_rx.clone(); - let config_path_api = std::path::PathBuf::from(&config_path); - let startup_tracker_api = startup_tracker.clone(); - tokio::spawn(async move { - api::serve( - listen, - stats_api, - ip_tracker_api, - me_pool_api, - upstream_manager_api, - config_rx_api, - admission_rx_api, - config_path_api, - None, - None, - process_started_at_epoch_secs, - startup_tracker_api, - ) - .await; - }); - startup_tracker - .complete_component( - COMPONENT_API_BOOTSTRAP, - Some(format!("api task spawned on {}", listen)), - ) - .await; - } else { - startup_tracker - .skip_component( - COMPONENT_API_BOOTSTRAP, - Some("server.api.listen has zero port".to_string()), - ) - .await; - } - } else { - startup_tracker - .skip_component( - COMPONENT_API_BOOTSTRAP, - Some("server.api.enabled is false".to_string()), - ) - .await; - } - - let mut tls_domains = Vec::with_capacity(1 + config.censorship.tls_domains.len()); - tls_domains.push(config.censorship.tls_domain.clone()); - for d in &config.censorship.tls_domains { - if !tls_domains.contains(d) { - tls_domains.push(d.clone()); - } - } - - // Start TLS front fetching in background immediately, in parallel with STUN probing. - startup_tracker - .start_component( - COMPONENT_TLS_FRONT_BOOTSTRAP, - Some("initialize TLS front cache/bootstrap tasks".to_string()), - ) - .await; - let tls_cache: Option> = if config.censorship.tls_emulation { - let cache = Arc::new(TlsFrontCache::new( - &tls_domains, - config.censorship.fake_cert_len, - &config.censorship.tls_front_dir, - )); - cache.load_from_disk().await; - - let port = config.censorship.mask_port; - let proxy_protocol = config.censorship.mask_proxy_protocol; - let mask_host = config - .censorship - .mask_host - .clone() - .unwrap_or_else(|| config.censorship.tls_domain.clone()); - let mask_unix_sock = config.censorship.mask_unix_sock.clone(); - let fetch_timeout = Duration::from_secs(5); - - let cache_initial = cache.clone(); - let domains_initial = tls_domains.clone(); - let host_initial = mask_host.clone(); - let unix_sock_initial = mask_unix_sock.clone(); - let upstream_initial = upstream_manager.clone(); - tokio::spawn(async move { - let mut join = tokio::task::JoinSet::new(); - for domain in domains_initial { - let cache_domain = cache_initial.clone(); - let host_domain = host_initial.clone(); - let unix_sock_domain = unix_sock_initial.clone(); - let upstream_domain = upstream_initial.clone(); - join.spawn(async move { - match crate::tls_front::fetcher::fetch_real_tls( - &host_domain, - port, - &domain, - fetch_timeout, - Some(upstream_domain), - proxy_protocol, - unix_sock_domain.as_deref(), - ) - .await - { - Ok(res) => cache_domain.update_from_fetch(&domain, res).await, - Err(e) => { - warn!(domain = %domain, error = %e, "TLS emulation initial fetch failed") - } - } - }); - } - while let Some(res) = join.join_next().await { - if let Err(e) = res { - warn!(error = %e, "TLS emulation initial fetch task join failed"); - } - } - }); - - let cache_timeout = cache.clone(); - let domains_timeout = tls_domains.clone(); - let fake_cert_len = config.censorship.fake_cert_len; - tokio::spawn(async move { - tokio::time::sleep(fetch_timeout).await; - for domain in domains_timeout { - let cached = cache_timeout.get(&domain).await; - if cached.domain == "default" { - warn!( - domain = %domain, - timeout_secs = fetch_timeout.as_secs(), - fake_cert_len, - "TLS-front fetch not ready within timeout; using cache/default fake cert fallback" - ); - } - } - }); - - // Periodic refresh with jitter. - let cache_refresh = cache.clone(); - let domains_refresh = tls_domains.clone(); - let host_refresh = mask_host.clone(); - let unix_sock_refresh = mask_unix_sock.clone(); - let upstream_refresh = upstream_manager.clone(); - tokio::spawn(async move { - loop { - let base_secs = rand::rng().random_range(4 * 3600..=6 * 3600); - let jitter_secs = rand::rng().random_range(0..=7200); - tokio::time::sleep(Duration::from_secs(base_secs + jitter_secs)).await; - - let mut join = tokio::task::JoinSet::new(); - for domain in domains_refresh.clone() { - let cache_domain = cache_refresh.clone(); - let host_domain = host_refresh.clone(); - let unix_sock_domain = unix_sock_refresh.clone(); - let upstream_domain = upstream_refresh.clone(); - join.spawn(async move { - match crate::tls_front::fetcher::fetch_real_tls( - &host_domain, - port, - &domain, - fetch_timeout, - Some(upstream_domain), - proxy_protocol, - unix_sock_domain.as_deref(), - ) - .await - { - Ok(res) => cache_domain.update_from_fetch(&domain, res).await, - Err(e) => warn!(domain = %domain, error = %e, "TLS emulation refresh failed"), - } - }); - } - - while let Some(res) = join.join_next().await { - if let Err(e) = res { - warn!(error = %e, "TLS emulation refresh task join failed"); - } - } - } - }); - - Some(cache) - } else { - startup_tracker - .skip_component( - COMPONENT_TLS_FRONT_BOOTSTRAP, - Some("censorship.tls_emulation is false".to_string()), - ) - .await; - None - }; - if tls_cache.is_some() { - startup_tracker - .complete_component( - COMPONENT_TLS_FRONT_BOOTSTRAP, - Some("tls front cache is initialized".to_string()), - ) - .await; - } - - startup_tracker - .start_component(COMPONENT_NETWORK_PROBE, Some("probe network capabilities".to_string())) - .await; - let probe = run_probe( - &config.network, - config.general.middle_proxy_nat_probe, - config.general.stun_nat_probe_concurrency, - ) - .await?; - let decision = decide_network_capabilities(&config.network, &probe); - log_probe_result(&probe, &decision); - startup_tracker - .complete_component( - COMPONENT_NETWORK_PROBE, - Some("network capabilities determined".to_string()), - ) - .await; - - let prefer_ipv6 = decision.prefer_ipv6(); - let mut use_middle_proxy = config.general.use_middle_proxy; - let beobachten = Arc::new(BeobachtenStore::new()); - let rng = Arc::new(SecureRandom::new()); - - // Connection concurrency limit - let max_connections = Arc::new(Semaphore::new(10_000)); - - let me2dc_fallback = config.general.me2dc_fallback; - let me_init_retry_attempts = config.general.me_init_retry_attempts; - let me_init_warn_after_attempts: u32 = 3; - if use_middle_proxy && !decision.ipv4_me && !decision.ipv6_me { - if me2dc_fallback { - warn!("No usable IP family for Middle Proxy detected; falling back to direct DC"); - use_middle_proxy = false; - } else { - warn!( - "No usable IP family for Middle Proxy detected; me2dc_fallback=false, ME init retries stay active" - ); - } - } - - if use_middle_proxy { - startup_tracker - .set_me_status(StartupMeStatus::Initializing, COMPONENT_ME_SECRET_FETCH) - .await; - startup_tracker - .start_component( - COMPONENT_ME_SECRET_FETCH, - Some("fetch proxy-secret from source/cache".to_string()), - ) - .await; - startup_tracker - .set_me_retry_limit(if !me2dc_fallback || me_init_retry_attempts == 0 { - "unlimited".to_string() - } else { - me_init_retry_attempts.to_string() - }) - .await; - } else { - startup_tracker - .set_me_status(StartupMeStatus::Skipped, "skipped") - .await; - startup_tracker - .skip_component( - COMPONENT_ME_SECRET_FETCH, - Some("middle proxy mode disabled".to_string()), - ) - .await; - startup_tracker - .skip_component( - COMPONENT_ME_PROXY_CONFIG_V4, - Some("middle proxy mode disabled".to_string()), - ) - .await; - startup_tracker - .skip_component( - COMPONENT_ME_PROXY_CONFIG_V6, - Some("middle proxy mode disabled".to_string()), - ) - .await; - startup_tracker - .skip_component( - COMPONENT_ME_POOL_CONSTRUCT, - Some("middle proxy mode disabled".to_string()), - ) - .await; - startup_tracker - .skip_component( - COMPONENT_ME_POOL_INIT_STAGE1, - Some("middle proxy mode disabled".to_string()), - ) - .await; - } - - // ===================================================================== - // Middle Proxy initialization (if enabled) - // ===================================================================== - let me_pool: Option> = if use_middle_proxy { - info!("=== Middle Proxy Mode ==="); - let me_nat_probe = config.general.middle_proxy_nat_probe && config.network.stun_use; - if config.general.middle_proxy_nat_probe && !config.network.stun_use { - info!("Middle-proxy STUN probing disabled by network.stun_use=false"); - } - - // Global ad_tag (pool default). Used when user has no per-user tag in access.user_ad_tags. - let proxy_tag = config - .general - .ad_tag - .as_ref() - .map(|tag| hex::decode(tag).expect("general.ad_tag must be validated before startup")); - - // ============================================================= - // CRITICAL: Download Telegram proxy-secret (NOT user secret!) - // - // C MTProxy uses TWO separate secrets: - // -S flag = 16-byte user secret for client obfuscation - // --aes-pwd = 32-512 byte binary file for ME RPC auth - // - // proxy-secret is from: https://core.telegram.org/getProxySecret - // ============================================================= - let proxy_secret_path = config.general.proxy_secret_path.as_deref(); - let pool_size = config.general.middle_proxy_pool_size.max(1); - let proxy_secret = loop { - match crate::transport::middle_proxy::fetch_proxy_secret( - proxy_secret_path, - config.general.proxy_secret_len_max, - ) - .await - { - Ok(proxy_secret) => break Some(proxy_secret), - Err(e) => { - startup_tracker - .set_me_last_error(Some(e.to_string())) - .await; - if me2dc_fallback { - error!( - error = %e, - "ME startup failed: proxy-secret is unavailable and no saved secret found; falling back to direct mode" - ); - break None; - } - - warn!( - error = %e, - retry_in_secs = 2, - "ME startup failed: proxy-secret is unavailable and no saved secret found; retrying because me2dc_fallback=false" - ); - tokio::time::sleep(Duration::from_secs(2)).await; - } - } - }; - match proxy_secret { - Some(proxy_secret) => { - startup_tracker - .complete_component( - COMPONENT_ME_SECRET_FETCH, - Some("proxy-secret loaded".to_string()), - ) - .await; - info!( - secret_len = proxy_secret.len(), - key_sig = format_args!( - "0x{:08x}", - if proxy_secret.len() >= 4 { - u32::from_le_bytes([ - proxy_secret[0], - proxy_secret[1], - proxy_secret[2], - proxy_secret[3], - ]) - } else { - 0 - } - ), - "Proxy-secret loaded" - ); - - startup_tracker - .start_component( - COMPONENT_ME_PROXY_CONFIG_V4, - Some("load startup proxy-config v4".to_string()), - ) - .await; - startup_tracker - .set_me_status(StartupMeStatus::Initializing, COMPONENT_ME_PROXY_CONFIG_V4) - .await; - let cfg_v4 = load_startup_proxy_config_snapshot( - "https://core.telegram.org/getProxyConfig", - config.general.proxy_config_v4_cache_path.as_deref(), - me2dc_fallback, - "getProxyConfig", - ) - .await; - if cfg_v4.is_some() { - startup_tracker - .complete_component( - COMPONENT_ME_PROXY_CONFIG_V4, - Some("proxy-config v4 loaded".to_string()), - ) - .await; - } else { - startup_tracker - .fail_component( - COMPONENT_ME_PROXY_CONFIG_V4, - Some("proxy-config v4 unavailable".to_string()), - ) - .await; - } - startup_tracker - .start_component( - COMPONENT_ME_PROXY_CONFIG_V6, - Some("load startup proxy-config v6".to_string()), - ) - .await; - startup_tracker - .set_me_status(StartupMeStatus::Initializing, COMPONENT_ME_PROXY_CONFIG_V6) - .await; - let cfg_v6 = load_startup_proxy_config_snapshot( - "https://core.telegram.org/getProxyConfigV6", - config.general.proxy_config_v6_cache_path.as_deref(), - me2dc_fallback, - "getProxyConfigV6", - ) - .await; - if cfg_v6.is_some() { - startup_tracker - .complete_component( - COMPONENT_ME_PROXY_CONFIG_V6, - Some("proxy-config v6 loaded".to_string()), - ) - .await; - } else { - startup_tracker - .fail_component( - COMPONENT_ME_PROXY_CONFIG_V6, - Some("proxy-config v6 unavailable".to_string()), - ) - .await; - } - - if let (Some(cfg_v4), Some(cfg_v6)) = (cfg_v4, cfg_v6) { - startup_tracker - .start_component( - COMPONENT_ME_POOL_CONSTRUCT, - Some("construct ME pool".to_string()), - ) - .await; - startup_tracker - .set_me_status(StartupMeStatus::Initializing, COMPONENT_ME_POOL_CONSTRUCT) - .await; - let pool = MePool::new( - proxy_tag.clone(), - proxy_secret, - config.general.middle_proxy_nat_ip, - me_nat_probe, - None, - config.network.stun_servers.clone(), - config.general.stun_nat_probe_concurrency, - probe.detected_ipv6, - config.timeouts.me_one_retry, - config.timeouts.me_one_timeout_ms, - cfg_v4.map.clone(), - cfg_v6.map.clone(), - cfg_v4.default_dc.or(cfg_v6.default_dc), - decision.clone(), - Some(upstream_manager.clone()), - rng.clone(), - stats.clone(), - config.general.me_keepalive_enabled, - config.general.me_keepalive_interval_secs, - config.general.me_keepalive_jitter_secs, - config.general.me_keepalive_payload_random, - config.general.rpc_proxy_req_every, - config.general.me_warmup_stagger_enabled, - config.general.me_warmup_step_delay_ms, - config.general.me_warmup_step_jitter_ms, - config.general.me_reconnect_max_concurrent_per_dc, - config.general.me_reconnect_backoff_base_ms, - config.general.me_reconnect_backoff_cap_ms, - config.general.me_reconnect_fast_retry_count, - config.general.me_single_endpoint_shadow_writers, - config.general.me_single_endpoint_outage_mode_enabled, - config.general.me_single_endpoint_outage_disable_quarantine, - config.general.me_single_endpoint_outage_backoff_min_ms, - config.general.me_single_endpoint_outage_backoff_max_ms, - config.general.me_single_endpoint_shadow_rotate_every_secs, - config.general.me_floor_mode, - config.general.me_adaptive_floor_idle_secs, - config.general.me_adaptive_floor_min_writers_single_endpoint, - config.general.me_adaptive_floor_min_writers_multi_endpoint, - config.general.me_adaptive_floor_recover_grace_secs, - config.general.me_adaptive_floor_writers_per_core_total, - config.general.me_adaptive_floor_cpu_cores_override, - config.general.me_adaptive_floor_max_extra_writers_single_per_core, - config.general.me_adaptive_floor_max_extra_writers_multi_per_core, - config.general.me_adaptive_floor_max_active_writers_per_core, - config.general.me_adaptive_floor_max_warm_writers_per_core, - config.general.me_adaptive_floor_max_active_writers_global, - config.general.me_adaptive_floor_max_warm_writers_global, - config.general.hardswap, - config.general.me_pool_drain_ttl_secs, - config.general.effective_me_pool_force_close_secs(), - config.general.me_pool_min_fresh_ratio, - config.general.me_hardswap_warmup_delay_min_ms, - config.general.me_hardswap_warmup_delay_max_ms, - config.general.me_hardswap_warmup_extra_passes, - config.general.me_hardswap_warmup_pass_backoff_base_ms, - config.general.me_bind_stale_mode, - config.general.me_bind_stale_ttl_secs, - config.general.me_secret_atomic_snapshot, - config.general.me_deterministic_writer_sort, - config.general.me_writer_pick_mode, - config.general.me_writer_pick_sample_size, - config.general.me_socks_kdf_policy, - config.general.me_writer_cmd_channel_capacity, - config.general.me_route_channel_capacity, - config.general.me_route_backpressure_base_timeout_ms, - config.general.me_route_backpressure_high_timeout_ms, - config.general.me_route_backpressure_high_watermark_pct, - config.general.me_reader_route_data_wait_ms, - config.general.me_health_interval_ms_unhealthy, - config.general.me_health_interval_ms_healthy, - config.general.me_warn_rate_limit_ms, - config.general.me_route_no_writer_mode, - config.general.me_route_no_writer_wait_ms, - config.general.me_route_inline_recovery_attempts, - config.general.me_route_inline_recovery_wait_ms, - ); - startup_tracker - .complete_component( - COMPONENT_ME_POOL_CONSTRUCT, - Some("ME pool object created".to_string()), - ) - .await; - *api_me_pool.write().await = Some(pool.clone()); - startup_tracker - .start_component( - COMPONENT_ME_POOL_INIT_STAGE1, - Some("initialize ME pool writers".to_string()), - ) - .await; - startup_tracker - .set_me_status( - StartupMeStatus::Initializing, - COMPONENT_ME_POOL_INIT_STAGE1, - ) - .await; - - let mut init_attempt: u32 = 0; - loop { - init_attempt = init_attempt.saturating_add(1); - startup_tracker.set_me_init_attempt(init_attempt).await; - match pool.init(pool_size, &rng).await { - Ok(()) => { - startup_tracker - .set_me_last_error(None) - .await; - startup_tracker - .complete_component( - COMPONENT_ME_POOL_INIT_STAGE1, - Some("ME pool initialized".to_string()), - ) - .await; - startup_tracker - .set_me_status(StartupMeStatus::Ready, "ready") - .await; - info!( - attempt = init_attempt, - "Middle-End pool initialized successfully" - ); - - // Phase 4: Start health monitor - let pool_clone = pool.clone(); - let rng_clone = rng.clone(); - let min_conns = pool_size; - tokio::spawn(async move { - crate::transport::middle_proxy::me_health_monitor( - pool_clone, rng_clone, min_conns, - ) - .await; - }); - - break Some(pool); - } - Err(e) => { - startup_tracker - .set_me_last_error(Some(e.to_string())) - .await; - let retries_limited = me2dc_fallback && me_init_retry_attempts > 0; - if retries_limited && init_attempt >= me_init_retry_attempts { - startup_tracker - .fail_component( - COMPONENT_ME_POOL_INIT_STAGE1, - Some("ME init retry budget exhausted".to_string()), - ) - .await; - startup_tracker - .set_me_status(StartupMeStatus::Failed, "failed") - .await; - error!( - error = %e, - attempt = init_attempt, - retry_limit = me_init_retry_attempts, - "ME pool init retries exhausted; falling back to direct mode" - ); - break None; - } - - let retry_limit = if !me2dc_fallback || me_init_retry_attempts == 0 { - String::from("unlimited") - } else { - me_init_retry_attempts.to_string() - }; - if init_attempt >= me_init_warn_after_attempts { - warn!( - error = %e, - attempt = init_attempt, - retry_limit = retry_limit, - me2dc_fallback = me2dc_fallback, - retry_in_secs = 2, - "ME pool is not ready yet; retrying startup initialization" - ); - } else { - info!( - error = %e, - attempt = init_attempt, - retry_limit = retry_limit, - me2dc_fallback = me2dc_fallback, - retry_in_secs = 2, - "ME pool startup warmup: retrying initialization" - ); - } - pool.reset_stun_state(); - tokio::time::sleep(Duration::from_secs(2)).await; - } - } - } - } else { - startup_tracker - .skip_component( - COMPONENT_ME_POOL_CONSTRUCT, - Some("ME configs are incomplete".to_string()), - ) - .await; - startup_tracker - .fail_component( - COMPONENT_ME_POOL_INIT_STAGE1, - Some("ME configs are incomplete".to_string()), - ) - .await; - startup_tracker - .set_me_status(StartupMeStatus::Failed, "failed") - .await; - None - } - } - None => { - startup_tracker - .fail_component( - COMPONENT_ME_SECRET_FETCH, - Some("proxy-secret unavailable".to_string()), - ) - .await; - startup_tracker - .skip_component( - COMPONENT_ME_PROXY_CONFIG_V4, - Some("proxy-secret unavailable".to_string()), - ) - .await; - startup_tracker - .skip_component( - COMPONENT_ME_PROXY_CONFIG_V6, - Some("proxy-secret unavailable".to_string()), - ) - .await; - startup_tracker - .skip_component( - COMPONENT_ME_POOL_CONSTRUCT, - Some("proxy-secret unavailable".to_string()), - ) - .await; - startup_tracker - .fail_component( - COMPONENT_ME_POOL_INIT_STAGE1, - Some("proxy-secret unavailable".to_string()), - ) - .await; - startup_tracker - .set_me_status(StartupMeStatus::Failed, "failed") - .await; - None - } - } - } else { - None - }; - - // If ME failed to initialize, force direct-only mode. - if me_pool.is_some() { - startup_tracker - .set_transport_mode("middle_proxy") - .await; - startup_tracker - .set_degraded(false) - .await; - info!("Transport: Middle-End Proxy - all DC-over-RPC"); - } else { - let _ = use_middle_proxy; - use_middle_proxy = false; - // Make runtime config reflect direct-only mode for handlers. - config.general.use_middle_proxy = false; - startup_tracker - .set_transport_mode("direct") - .await; - startup_tracker - .set_degraded(true) - .await; - if me2dc_fallback { - startup_tracker - .set_me_status(StartupMeStatus::Failed, "fallback_to_direct") - .await; - } else { - startup_tracker - .set_me_status(StartupMeStatus::Skipped, "skipped") - .await; - } - info!("Transport: Direct DC - TCP - standard DC-over-TCP"); - } - - // Freeze config after possible fallback decision - let config = Arc::new(config); - - let replay_checker = Arc::new(ReplayChecker::new( - config.access.replay_check_len, - Duration::from_secs(config.access.replay_window_secs), - )); - - let buffer_pool = Arc::new(BufferPool::with_config(16 * 1024, 4096)); - - // Middle-End ping before DC connectivity - if me_pool.is_some() { - startup_tracker - .start_component( - COMPONENT_ME_CONNECTIVITY_PING, - Some("run startup ME connectivity check".to_string()), - ) - .await; - } else { - startup_tracker - .skip_component( - COMPONENT_ME_CONNECTIVITY_PING, - Some("ME pool is not available".to_string()), - ) - .await; - } - if let Some(ref pool) = me_pool { - let me_results = run_me_ping(pool, &rng).await; - - let v4_ok = me_results.iter().any(|r| { - matches!(r.family, MePingFamily::V4) - && r.samples.iter().any(|s| s.error.is_none() && s.handshake_ms.is_some()) - }); - let v6_ok = me_results.iter().any(|r| { - matches!(r.family, MePingFamily::V6) - && r.samples.iter().any(|s| s.error.is_none() && s.handshake_ms.is_some()) - }); - - info!("================= Telegram ME Connectivity ================="); - if v4_ok && v6_ok { - info!(" IPv4 and IPv6 available"); - } else if v4_ok { - info!(" IPv4 only / IPv6 unavailable"); - } else if v6_ok { - info!(" IPv6 only / IPv4 unavailable"); - } else { - info!(" No ME connectivity"); - } - let me_route = format_me_route( - &config.upstreams, - &me_results, - prefer_ipv6, - v4_ok, - v6_ok, - ) - .await; - info!(" via {}", me_route); - info!("============================================================"); - - use std::collections::BTreeMap; - let mut grouped: BTreeMap> = BTreeMap::new(); - for report in me_results { - for s in report.samples { - grouped.entry(s.dc).or_default().push(s); - } - } - - let family_order = if prefer_ipv6 { - vec![MePingFamily::V6, MePingFamily::V4] - } else { - vec![MePingFamily::V4, MePingFamily::V6] - }; - - for (dc, samples) in grouped { - for family in &family_order { - let fam_samples: Vec<&MePingSample> = samples - .iter() - .filter(|s| matches!(s.family, f if &f == family)) - .collect(); - if fam_samples.is_empty() { - continue; - } - - let fam_label = match family { - MePingFamily::V4 => "IPv4", - MePingFamily::V6 => "IPv6", - }; - info!(" DC{} [{}]", dc, fam_label); - for sample in fam_samples { - let line = format_sample_line(sample); - info!("{}", line); - } - } - } - info!("============================================================"); - startup_tracker - .complete_component( - COMPONENT_ME_CONNECTIVITY_PING, - Some("startup ME connectivity check completed".to_string()), - ) - .await; - } - - info!("================= Telegram DC Connectivity ================="); - startup_tracker - .start_component( - COMPONENT_DC_CONNECTIVITY_PING, - Some("run startup DC connectivity check".to_string()), - ) - .await; - - let ping_results = upstream_manager - .ping_all_dcs( - prefer_ipv6, - &config.dc_overrides, - decision.ipv4_dc, - decision.ipv6_dc, - ) - .await; - - for upstream_result in &ping_results { - let v6_works = upstream_result - .v6_results - .iter() - .any(|r| r.rtt_ms.is_some()); - let v4_works = upstream_result - .v4_results - .iter() - .any(|r| r.rtt_ms.is_some()); - - if upstream_result.both_available { - if prefer_ipv6 { - info!(" IPv6 in use / IPv4 is fallback"); - } else { - info!(" IPv4 in use / IPv6 is fallback"); - } - } else if v6_works && !v4_works { - info!(" IPv6 only / IPv4 unavailable"); - } else if v4_works && !v6_works { - info!(" IPv4 only / IPv6 unavailable"); - } else if !v6_works && !v4_works { - info!(" No DC connectivity"); - } - - info!(" via {}", upstream_result.upstream_name); - info!("============================================================"); - - // Print IPv6 results first (only if IPv6 is available) - if v6_works { - for dc in &upstream_result.v6_results { - let addr_str = format!("{}:{}", dc.dc_addr.ip(), dc.dc_addr.port()); - match &dc.rtt_ms { - Some(rtt) => { - info!(" DC{} [IPv6] {} - {:.0} ms", dc.dc_idx, addr_str, rtt); - } - None => { - let err = dc.error.as_deref().unwrap_or("fail"); - info!(" DC{} [IPv6] {} - FAIL ({})", dc.dc_idx, addr_str, err); - } - } - } - - info!("============================================================"); - } - - // Print IPv4 results (only if IPv4 is available) - if v4_works { - for dc in &upstream_result.v4_results { - let addr_str = format!("{}:{}", dc.dc_addr.ip(), dc.dc_addr.port()); - match &dc.rtt_ms { - Some(rtt) => { - info!( - " DC{} [IPv4] {}\t\t\t\t{:.0} ms", - dc.dc_idx, addr_str, rtt - ); - } - None => { - let err = dc.error.as_deref().unwrap_or("fail"); - info!( - " DC{} [IPv4] {}:\t\t\t\tFAIL ({})", - dc.dc_idx, addr_str, err - ); - } - } - } - - info!("============================================================"); - } - } - startup_tracker - .complete_component( - COMPONENT_DC_CONNECTIVITY_PING, - Some("startup DC connectivity check completed".to_string()), - ) - .await; - - let initialized_secs = process_started_at.elapsed().as_secs(); - let second_suffix = if initialized_secs == 1 { "" } else { "s" }; - startup_tracker - .start_component( - COMPONENT_RUNTIME_READY, - Some("finalize startup runtime state".to_string()), - ) - .await; - info!("===================== Telegram Startup ====================="); - info!( - " DC/ME Initialized in {} second{}", - initialized_secs, second_suffix - ); - info!("============================================================"); - - if let Some(ref pool) = me_pool { - pool.set_runtime_ready(true); - } - *api_me_pool.write().await = me_pool.clone(); - - // Background tasks - let um_clone = upstream_manager.clone(); - let decision_clone = decision.clone(); - let dc_overrides_for_health = config.dc_overrides.clone(); - tokio::spawn(async move { - um_clone - .run_health_checks( - prefer_ipv6, - decision_clone.ipv4_dc, - decision_clone.ipv6_dc, - dc_overrides_for_health, - ) - .await; - }); - - let rc_clone = replay_checker.clone(); - tokio::spawn(async move { - rc_clone.run_periodic_cleanup().await; - }); - - let detected_ip_v4: Option = probe.detected_ipv4.map(std::net::IpAddr::V4); - let detected_ip_v6: Option = probe.detected_ipv6.map(std::net::IpAddr::V6); - debug!( - "Detected IPs: v4={:?} v6={:?}", - detected_ip_v4, detected_ip_v6 - ); - - // ── Hot-reload watcher ──────────────────────────────────────────────── - // Uses inotify to detect file changes instantly (SIGHUP also works). - // detected_ip_v4/v6 are passed so newly added users get correct TG links. - startup_tracker - .start_component( - COMPONENT_CONFIG_WATCHER_START, - Some("spawn config hot-reload watcher".to_string()), - ) - .await; - let (config_rx, mut log_level_rx): ( - tokio::sync::watch::Receiver>, - tokio::sync::watch::Receiver, - ) = spawn_config_watcher( - std::path::PathBuf::from(&config_path), - config.clone(), - detected_ip_v4, - detected_ip_v6, - ); - startup_tracker - .complete_component( - COMPONENT_CONFIG_WATCHER_START, - Some("config hot-reload watcher started".to_string()), - ) - .await; - let mut config_rx_api_bridge = config_rx.clone(); - let api_config_tx_bridge = api_config_tx.clone(); - tokio::spawn(async move { - loop { - if config_rx_api_bridge.changed().await.is_err() { - break; - } - let cfg = config_rx_api_bridge.borrow_and_update().clone(); - api_config_tx_bridge.send_replace(cfg); - } - }); - - let stats_policy = stats.clone(); - let mut config_rx_policy = config_rx.clone(); - let me_pool_policy = me_pool.clone(); - tokio::spawn(async move { - loop { - if config_rx_policy.changed().await.is_err() { - break; - } - let cfg = config_rx_policy.borrow_and_update().clone(); - stats_policy.apply_telemetry_policy(TelemetryPolicy::from_config(&cfg.general.telemetry)); - if let Some(pool) = &me_pool_policy { - pool.update_runtime_transport_policy( - cfg.general.me_socks_kdf_policy, - cfg.general.me_route_backpressure_base_timeout_ms, - cfg.general.me_route_backpressure_high_timeout_ms, - cfg.general.me_route_backpressure_high_watermark_pct, - cfg.general.me_reader_route_data_wait_ms, - ); - } - } - }); - - let ip_tracker_policy = ip_tracker.clone(); - let mut config_rx_ip_limits = config_rx.clone(); - tokio::spawn(async move { - let mut prev_limits = config_rx_ip_limits - .borrow() - .access - .user_max_unique_ips - .clone(); - let mut prev_mode = config_rx_ip_limits - .borrow() - .access - .user_max_unique_ips_mode; - let mut prev_window = config_rx_ip_limits - .borrow() - .access - .user_max_unique_ips_window_secs; - - loop { - if config_rx_ip_limits.changed().await.is_err() { - break; - } - let cfg = config_rx_ip_limits.borrow_and_update().clone(); - - if prev_limits != cfg.access.user_max_unique_ips { - ip_tracker_policy - .load_limits(&cfg.access.user_max_unique_ips) - .await; - prev_limits = cfg.access.user_max_unique_ips.clone(); - } - - if prev_mode != cfg.access.user_max_unique_ips_mode - || prev_window != cfg.access.user_max_unique_ips_window_secs - { - ip_tracker_policy - .set_limit_policy( - cfg.access.user_max_unique_ips_mode, - cfg.access.user_max_unique_ips_window_secs, - ) - .await; - prev_mode = cfg.access.user_max_unique_ips_mode; - prev_window = cfg.access.user_max_unique_ips_window_secs; - } - } - }); - - let beobachten_writer = beobachten.clone(); - let config_rx_beobachten = config_rx.clone(); - tokio::spawn(async move { - loop { - let cfg = config_rx_beobachten.borrow().clone(); - let sleep_secs = cfg.general.beobachten_flush_secs.max(1); - - if cfg.general.beobachten { - let ttl = Duration::from_secs(cfg.general.beobachten_minutes.saturating_mul(60)); - let path = cfg.general.beobachten_file.clone(); - let snapshot = beobachten_writer.snapshot_text(ttl); - if let Err(e) = write_beobachten_snapshot(&path, &snapshot).await { - warn!(error = %e, path = %path, "Failed to flush beobachten snapshot"); - } - } - - tokio::time::sleep(Duration::from_secs(sleep_secs)).await; - } - }); - - if let Some(ref pool) = me_pool { - let reinit_trigger_capacity = config - .general - .me_reinit_trigger_channel - .max(1); - let (reinit_tx, reinit_rx) = mpsc::channel::(reinit_trigger_capacity); - - let pool_clone_sched = pool.clone(); - let rng_clone_sched = rng.clone(); - let config_rx_clone_sched = config_rx.clone(); - tokio::spawn(async move { - crate::transport::middle_proxy::me_reinit_scheduler( - pool_clone_sched, - rng_clone_sched, - config_rx_clone_sched, - reinit_rx, - ) - .await; - }); - - let pool_clone = pool.clone(); - let config_rx_clone = config_rx.clone(); - let reinit_tx_updater = reinit_tx.clone(); - tokio::spawn(async move { - crate::transport::middle_proxy::me_config_updater( - pool_clone, - config_rx_clone, - reinit_tx_updater, - ) - .await; - }); - - let config_rx_clone_rot = config_rx.clone(); - let reinit_tx_rotation = reinit_tx.clone(); - tokio::spawn(async move { - crate::transport::middle_proxy::me_rotation_task( - config_rx_clone_rot, - reinit_tx_rotation, - ) - .await; - }); - } - - startup_tracker - .start_component( - COMPONENT_LISTENERS_BIND, - Some("bind TCP/Unix listeners".to_string()), - ) - .await; - let mut listeners = Vec::new(); - - for listener_conf in &config.server.listeners { - let addr = SocketAddr::new(listener_conf.ip, config.server.port); - if addr.is_ipv4() && !decision.ipv4_dc { - warn!(%addr, "Skipping IPv4 listener: IPv4 disabled by [network]"); - continue; - } - if addr.is_ipv6() && !decision.ipv6_dc { - warn!(%addr, "Skipping IPv6 listener: IPv6 disabled by [network]"); - continue; - } - let options = ListenOptions { - reuse_port: listener_conf.reuse_allow, - ipv6_only: listener_conf.ip.is_ipv6(), - ..Default::default() - }; - - match create_listener(addr, &options) { - Ok(socket) => { - let listener = TcpListener::from_std(socket.into())?; - info!("Listening on {}", addr); - let listener_proxy_protocol = - listener_conf.proxy_protocol.unwrap_or(config.server.proxy_protocol); - - // Resolve the public host for link generation - let public_host = if let Some(ref announce) = listener_conf.announce { - announce.clone() // Use announce (IP or hostname) if explicitly set - } else if listener_conf.ip.is_unspecified() { - // Auto-detect for unspecified addresses - if listener_conf.ip.is_ipv4() { - detected_ip_v4 - .map(|ip| ip.to_string()) - .unwrap_or_else(|| listener_conf.ip.to_string()) - } else { - detected_ip_v6 - .map(|ip| ip.to_string()) - .unwrap_or_else(|| listener_conf.ip.to_string()) - } - } else { - listener_conf.ip.to_string() - }; - - // Show per-listener proxy links only when public_host is not set - if config.general.links.public_host.is_none() && !config.general.links.show.is_empty() { - let link_port = config.general.links.public_port.unwrap_or(config.server.port); - print_proxy_links(&public_host, link_port, &config); - } - - listeners.push((listener, listener_proxy_protocol)); - } - Err(e) => { - if e.kind() == std::io::ErrorKind::AddrInUse { - let owners = find_listener_processes(addr); - if owners.is_empty() { - error!( - %addr, - "Failed to bind: address already in use (owner process unresolved)" - ); - } else { - for owner in owners { - error!( - %addr, - pid = owner.pid, - process = %owner.process, - "Failed to bind: address already in use" - ); - } - } - - if !listener_conf.reuse_allow { - error!( - %addr, - "reuse_allow=false; set [[server.listeners]].reuse_allow=true to allow multi-instance listening" - ); - } - } else { - error!("Failed to bind to {}: {}", addr, e); - } - } - } - } - - // Show proxy links once when public_host is set, OR when there are no TCP listeners - // (unix-only mode) — use detected IP as fallback - if !config.general.links.show.is_empty() && (config.general.links.public_host.is_some() || listeners.is_empty()) { - let (host, port) = if let Some(ref h) = config.general.links.public_host { - (h.clone(), config.general.links.public_port.unwrap_or(config.server.port)) - } else { - let ip = detected_ip_v4 - .or(detected_ip_v6) - .map(|ip| ip.to_string()); - if ip.is_none() { - warn!("show_link is configured but public IP could not be detected. Set public_host in config."); - } - (ip.unwrap_or_else(|| "UNKNOWN".to_string()), config.general.links.public_port.unwrap_or(config.server.port)) - }; - - print_proxy_links(&host, port, &config); - } - - if config.general.use_middle_proxy { - if let Some(pool) = me_pool.as_ref() { - let fallback_after = Duration::from_secs(6); - let initial_ready = pool.admission_ready_conditional_cast().await; - admission_tx.send_replace(initial_ready); - let _ = route_runtime.set_mode(RelayRouteMode::Middle); - if initial_ready { - info!("Conditional-admission gate: open (ME pool ready)"); - } else { - warn!("Conditional-admission gate: closed (ME pool is not ready)"); - } - - let pool_for_gate = pool.clone(); - let admission_tx_gate = admission_tx.clone(); - let route_runtime_gate = route_runtime.clone(); - let mut config_rx_gate = config_rx.clone(); - let mut admission_poll_ms = config.general.me_admission_poll_ms.max(1); - let mut fallback_enabled = config.general.me2dc_fallback; - tokio::spawn(async move { - let mut gate_open = initial_ready; - let mut route_mode = RelayRouteMode::Middle; - let mut not_ready_since = if initial_ready { - None - } else { - Some(Instant::now()) - }; - loop { - tokio::select! { - changed = config_rx_gate.changed() => { - if changed.is_err() { - break; - } - let cfg = config_rx_gate.borrow_and_update().clone(); - admission_poll_ms = cfg.general.me_admission_poll_ms.max(1); - fallback_enabled = cfg.general.me2dc_fallback; - continue; - } - _ = tokio::time::sleep(Duration::from_millis(admission_poll_ms)) => {} - } - let ready = pool_for_gate.admission_ready_conditional_cast().await; - let now = Instant::now(); - let (next_gate_open, next_route_mode, next_fallback_active) = if ready { - not_ready_since = None; - (true, RelayRouteMode::Middle, false) - } else { - let not_ready_started_at = *not_ready_since.get_or_insert(now); - let not_ready_for = now.saturating_duration_since(not_ready_started_at); - if fallback_enabled && not_ready_for > fallback_after { - (true, RelayRouteMode::Direct, true) - } else { - (false, RelayRouteMode::Middle, false) - } - }; - - if next_route_mode != route_mode { - route_mode = next_route_mode; - if let Some(snapshot) = route_runtime_gate.set_mode(route_mode) { - if matches!(route_mode, RelayRouteMode::Middle) { - info!( - target_mode = route_mode.as_str(), - cutover_generation = snapshot.generation, - "Middle-End routing restored for new sessions" - ); - } else { - warn!( - target_mode = route_mode.as_str(), - cutover_generation = snapshot.generation, - grace_secs = fallback_after.as_secs(), - "ME pool stayed not-ready beyond grace; routing new sessions via Direct-DC" - ); - } - } - } - - if next_gate_open != gate_open { - gate_open = next_gate_open; - admission_tx_gate.send_replace(gate_open); - if gate_open { - if next_fallback_active { - warn!("Conditional-admission gate opened in ME fallback mode"); - } else { - info!("Conditional-admission gate opened (ME pool ready)"); - } - } else { - warn!("Conditional-admission gate closed (ME pool is not ready)"); - } - } - - } - }); - } else { - admission_tx.send_replace(false); - let _ = route_runtime.set_mode(RelayRouteMode::Direct); - warn!("Conditional-admission gate: closed (ME pool is unavailable)"); - } - } else { - admission_tx.send_replace(true); - let _ = route_runtime.set_mode(RelayRouteMode::Direct); - } - let _admission_tx_hold = admission_tx; - - // Unix socket setup (before listeners check so unix-only config works) - let mut has_unix_listener = false; - #[cfg(unix)] - if let Some(ref unix_path) = config.server.listen_unix_sock { - // Remove stale socket file if present (standard practice) - let _ = tokio::fs::remove_file(unix_path).await; - - let unix_listener = UnixListener::bind(unix_path)?; - - // Apply socket permissions if configured - if let Some(ref perm_str) = config.server.listen_unix_sock_perm { - match u32::from_str_radix(perm_str.trim_start_matches('0'), 8) { - Ok(mode) => { - use std::os::unix::fs::PermissionsExt; - let perms = std::fs::Permissions::from_mode(mode); - if let Err(e) = std::fs::set_permissions(unix_path, perms) { - error!("Failed to set unix socket permissions to {}: {}", perm_str, e); - } else { - info!("Listening on unix:{} (mode {})", unix_path, perm_str); - } - } - Err(e) => { - warn!("Invalid listen_unix_sock_perm '{}': {}. Ignoring.", perm_str, e); - info!("Listening on unix:{}", unix_path); - } - } - } else { - info!("Listening on unix:{}", unix_path); - } - - has_unix_listener = true; - - let mut config_rx_unix: tokio::sync::watch::Receiver> = config_rx.clone(); - let mut admission_rx_unix = admission_rx.clone(); - let stats = stats.clone(); - let upstream_manager = upstream_manager.clone(); - let replay_checker = replay_checker.clone(); - let buffer_pool = buffer_pool.clone(); - let rng = rng.clone(); - let me_pool = me_pool.clone(); - let route_runtime = route_runtime.clone(); - let tls_cache = tls_cache.clone(); - let ip_tracker = ip_tracker.clone(); - let beobachten = beobachten.clone(); - let max_connections_unix = max_connections.clone(); - - tokio::spawn(async move { - let unix_conn_counter = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(1)); - - loop { - if !wait_until_admission_open(&mut admission_rx_unix).await { - warn!("Conditional-admission gate channel closed for unix listener"); - break; - } - match unix_listener.accept().await { - Ok((stream, _)) => { - let permit = match max_connections_unix.clone().acquire_owned().await { - Ok(permit) => permit, - Err(_) => { - error!("Connection limiter is closed"); - break; - } - }; - let conn_id = unix_conn_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - let fake_peer = SocketAddr::from(([127, 0, 0, 1], (conn_id % 65535) as u16)); - - let config = config_rx_unix.borrow_and_update().clone(); - let stats = stats.clone(); - let upstream_manager = upstream_manager.clone(); - let replay_checker = replay_checker.clone(); - let buffer_pool = buffer_pool.clone(); - let rng = rng.clone(); - let me_pool = me_pool.clone(); - let route_runtime = route_runtime.clone(); - let tls_cache = tls_cache.clone(); - let ip_tracker = ip_tracker.clone(); - let beobachten = beobachten.clone(); - let proxy_protocol_enabled = config.server.proxy_protocol; - - tokio::spawn(async move { - let _permit = permit; - if let Err(e) = crate::proxy::client::handle_client_stream( - stream, fake_peer, config, stats, - upstream_manager, replay_checker, buffer_pool, rng, - me_pool, route_runtime, tls_cache, ip_tracker, beobachten, proxy_protocol_enabled, - ).await { - debug!(error = %e, "Unix socket connection error"); - } - }); - } - Err(e) => { - error!("Unix socket accept error: {}", e); - tokio::time::sleep(Duration::from_millis(100)).await; - } - } - } - }); - } - startup_tracker - .complete_component( - COMPONENT_LISTENERS_BIND, - Some(format!( - "listeners configured tcp={} unix={}", - listeners.len(), - has_unix_listener - )), - ) - .await; - - if listeners.is_empty() && !has_unix_listener { - error!("No listeners. Exiting."); - std::process::exit(1); - } - - // Switch to user-configured log level after startup - let runtime_filter = if has_rust_log { - EnvFilter::from_default_env() - } else if matches!(effective_log_level, LogLevel::Silent) { - EnvFilter::new("warn,telemt::links=info") - } else { - EnvFilter::new(effective_log_level.to_filter_str()) - }; - filter_handle - .reload(runtime_filter) - .expect("Failed to switch log filter"); - - // Apply log_level changes from hot-reload to the tracing filter. - tokio::spawn(async move { - loop { - if log_level_rx.changed().await.is_err() { - break; - } - let level = log_level_rx.borrow_and_update().clone(); - let new_filter = tracing_subscriber::EnvFilter::new(level.to_filter_str()); - if let Err(e) = filter_handle.reload(new_filter) { - tracing::error!("config reload: failed to update log filter: {}", e); - } - } - }); - - if let Some(port) = config.server.metrics_port { - startup_tracker - .start_component( - COMPONENT_METRICS_START, - Some(format!("spawn metrics endpoint on {}", port)), - ) - .await; - let stats = stats.clone(); - let beobachten = beobachten.clone(); - let config_rx_metrics = config_rx.clone(); - let ip_tracker_metrics = ip_tracker.clone(); - let whitelist = config.server.metrics_whitelist.clone(); - tokio::spawn(async move { - metrics::serve( - port, - stats, - beobachten, - ip_tracker_metrics, - config_rx_metrics, - whitelist, - ) - .await; - }); - startup_tracker - .complete_component( - COMPONENT_METRICS_START, - Some("metrics task spawned".to_string()), - ) - .await; - } else { - startup_tracker - .skip_component( - COMPONENT_METRICS_START, - Some("server.metrics_port is not configured".to_string()), - ) - .await; - } - - startup_tracker - .complete_component( - COMPONENT_RUNTIME_READY, - Some("startup pipeline is fully initialized".to_string()), - ) - .await; - startup_tracker.mark_ready().await; - - for (listener, listener_proxy_protocol) in listeners { - let mut config_rx: tokio::sync::watch::Receiver> = config_rx.clone(); - let mut admission_rx_tcp = admission_rx.clone(); - let stats = stats.clone(); - let upstream_manager = upstream_manager.clone(); - let replay_checker = replay_checker.clone(); - let buffer_pool = buffer_pool.clone(); - let rng = rng.clone(); - let me_pool = me_pool.clone(); - let route_runtime = route_runtime.clone(); - let tls_cache = tls_cache.clone(); - let ip_tracker = ip_tracker.clone(); - let beobachten = beobachten.clone(); - let max_connections_tcp = max_connections.clone(); - - tokio::spawn(async move { - loop { - if !wait_until_admission_open(&mut admission_rx_tcp).await { - warn!("Conditional-admission gate channel closed for tcp listener"); - break; - } - match listener.accept().await { - Ok((stream, peer_addr)) => { - let permit = match max_connections_tcp.clone().acquire_owned().await { - Ok(permit) => permit, - Err(_) => { - error!("Connection limiter is closed"); - break; - } - }; - let config = config_rx.borrow_and_update().clone(); - let stats = stats.clone(); - let upstream_manager = upstream_manager.clone(); - let replay_checker = replay_checker.clone(); - let buffer_pool = buffer_pool.clone(); - let rng = rng.clone(); - let me_pool = me_pool.clone(); - let route_runtime = route_runtime.clone(); - let tls_cache = tls_cache.clone(); - let ip_tracker = ip_tracker.clone(); - let beobachten = beobachten.clone(); - let proxy_protocol_enabled = listener_proxy_protocol; - let real_peer_report = Arc::new(std::sync::Mutex::new(None)); - let real_peer_report_for_handler = real_peer_report.clone(); - - tokio::spawn(async move { - let _permit = permit; - if let Err(e) = ClientHandler::new( - stream, - peer_addr, - config, - stats, - upstream_manager, - replay_checker, - buffer_pool, - rng, - me_pool, - route_runtime, - tls_cache, - ip_tracker, - beobachten, - proxy_protocol_enabled, - real_peer_report_for_handler, - ) - .run() - .await - { - let real_peer = match real_peer_report.lock() { - Ok(guard) => *guard, - Err(_) => None, - }; - let peer_closed = matches!( - &e, - crate::error::ProxyError::Io(ioe) - if matches!( - ioe.kind(), - std::io::ErrorKind::ConnectionReset - | std::io::ErrorKind::ConnectionAborted - | std::io::ErrorKind::BrokenPipe - | std::io::ErrorKind::NotConnected - ) - ) || matches!( - &e, - crate::error::ProxyError::Stream( - crate::error::StreamError::Io(ioe) - ) - if matches!( - ioe.kind(), - std::io::ErrorKind::ConnectionReset - | std::io::ErrorKind::ConnectionAborted - | std::io::ErrorKind::BrokenPipe - | std::io::ErrorKind::NotConnected - ) - ); - - let me_closed = matches!( - &e, - crate::error::ProxyError::Proxy(msg) if msg == "ME connection lost" - ); - let route_switched = matches!( - &e, - crate::error::ProxyError::Proxy(msg) if msg == ROUTE_SWITCH_ERROR_MSG - ); - - match (peer_closed, me_closed) { - (true, _) => { - if let Some(real_peer) = real_peer { - debug!(peer = %peer_addr, real_peer = %real_peer, error = %e, "Connection closed by client"); - } else { - debug!(peer = %peer_addr, error = %e, "Connection closed by client"); - } - } - (_, true) => { - if let Some(real_peer) = real_peer { - warn!(peer = %peer_addr, real_peer = %real_peer, error = %e, "Connection closed: Middle-End dropped session"); - } else { - warn!(peer = %peer_addr, error = %e, "Connection closed: Middle-End dropped session"); - } - } - _ if route_switched => { - if let Some(real_peer) = real_peer { - info!(peer = %peer_addr, real_peer = %real_peer, error = %e, "Connection closed by controlled route cutover"); - } else { - info!(peer = %peer_addr, error = %e, "Connection closed by controlled route cutover"); - } - } - _ if is_expected_handshake_eof(&e) => { - if let Some(real_peer) = real_peer { - info!(peer = %peer_addr, real_peer = %real_peer, error = %e, "Connection closed during initial handshake"); - } else { - info!(peer = %peer_addr, error = %e, "Connection closed during initial handshake"); - } - } - _ => { - if let Some(real_peer) = real_peer { - warn!(peer = %peer_addr, real_peer = %real_peer, error = %e, "Connection closed with error"); - } else { - warn!(peer = %peer_addr, error = %e, "Connection closed with error"); - } - } - } - } - }); - } - Err(e) => { - error!("Accept error: {}", e); - tokio::time::sleep(Duration::from_millis(100)).await; - } - } - } - }); - } - - match signal::ctrl_c().await { - Ok(()) => { - let shutdown_started_at = Instant::now(); - info!("Shutting down..."); - let uptime_secs = process_started_at.elapsed().as_secs(); - info!("Uptime: {}", format_uptime(uptime_secs)); - if let Some(pool) = &me_pool { - match tokio::time::timeout( - Duration::from_secs(2), - pool.shutdown_send_close_conn_all(), - ) - .await - { - Ok(total) => { - info!( - close_conn_sent = total, - "ME shutdown: RPC_CLOSE_CONN broadcast completed" - ); - } - Err(_) => { - warn!("ME shutdown: RPC_CLOSE_CONN broadcast timed out"); - } - } - } - let shutdown_secs = shutdown_started_at.elapsed().as_secs(); - info!( - "Shutdown completed successfully in {} {}.", - shutdown_secs, - unit_label(shutdown_secs, "second", "seconds") - ); - } - Err(e) => error!("Signal error: {}", e), - } - - Ok(()) + maestro::run().await } diff --git a/src/proxy/client.rs b/src/proxy/client.rs index 4bc3751..99e6837 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -806,12 +806,8 @@ impl RunningClientHandler { }); } - let mut ip_reserved = false; - // IP limit check - match ip_tracker.check_and_add(user, peer_addr.ip()).await { - Ok(()) => { - ip_reserved = true; - } + let ip_reserved = match ip_tracker.check_and_add(user, peer_addr.ip()).await { + Ok(()) => true, Err(reason) => { warn!( user = %user, @@ -823,7 +819,8 @@ impl RunningClientHandler { user: user.to_string(), }); } - } + }; + // IP limit check if let Some(limit) = config.access.user_max_tcp_conns.get(user) && stats.get_user_curr_connects(user) >= *limit as u64 diff --git a/src/proxy/middle_relay.rs b/src/proxy/middle_relay.rs index efaa8ba..aaae1b3 100644 --- a/src/proxy/middle_relay.rs +++ b/src/proxy/middle_relay.rs @@ -382,7 +382,7 @@ where let mut batch_frames = 0usize; let mut batch_bytes = 0usize; - let mut flush_immediately = false; + let mut flush_immediately; match process_me_writer_response( first,