//! telemt — Telegram MTProto Proxy #![allow(unused_assignments)] use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; use rand::Rng; use tokio::net::TcpListener; use tokio::signal; use tokio::sync::Semaphore; use tracing::{debug, error, info, warn}; use tracing_subscriber::{EnvFilter, fmt, prelude::*, reload}; #[cfg(unix)] use tokio::net::UnixListener; mod cli; mod config; mod crypto; mod error; mod ip_tracker; mod network; mod metrics; mod protocol; mod proxy; mod stats; mod stream; mod transport; mod tls_front; mod util; use crate::config::{LogLevel, ProxyConfig}; use crate::config::hot_reload::spawn_config_watcher; use crate::crypto::SecureRandom; use crate::ip_tracker::UserIpTracker; use crate::network::probe::{decide_network_capabilities, log_probe_result, run_probe}; use crate::proxy::ClientHandler; use crate::stats::beobachten::BeobachtenStore; use crate::stats::telemetry::TelemetryPolicy; use crate::stats::{ReplayChecker, Stats}; use crate::stream::BufferPool; use crate::transport::middle_proxy::{ MePool, fetch_proxy_config, run_me_ping, MePingFamily, MePingSample, format_sample_line, format_me_route, }; use crate::transport::{ListenOptions, UpstreamManager, create_listener, find_listener_processes}; use crate::tls_front::TlsFrontCache; fn parse_cli() -> (String, bool, Option) { let mut config_path = "config.toml".to_string(); let mut silent = false; let mut log_level: Option = None; let args: Vec = std::env::args().skip(1).collect(); // Check for --init first (handled before tokio) if let Some(init_opts) = cli::parse_init_args(&args) { if let Err(e) = cli::run_init(init_opts) { eprintln!("[telemt] Init failed: {}", e); std::process::exit(1); } std::process::exit(0); } let mut i = 0; while i < args.len() { match args[i].as_str() { "--silent" | "-s" => { silent = true; } "--log-level" => { i += 1; if i < args.len() { log_level = Some(args[i].clone()); } } s if s.starts_with("--log-level=") => { log_level = Some(s.trim_start_matches("--log-level=").to_string()); } "--help" | "-h" => { eprintln!("Usage: telemt [config.toml] [OPTIONS]"); eprintln!(); eprintln!("Options:"); eprintln!(" --silent, -s Suppress info logs"); eprintln!(" --log-level debug|verbose|normal|silent"); eprintln!(" --help, -h Show this help"); eprintln!(); eprintln!("Setup (fire-and-forget):"); eprintln!( " --init Generate config, install systemd service, start" ); eprintln!(" --port Listen port (default: 443)"); eprintln!( " --domain TLS domain for masking (default: www.google.com)" ); eprintln!( " --secret 32-char hex secret (auto-generated if omitted)" ); eprintln!(" --user Username (default: user)"); eprintln!(" --config-dir Config directory (default: /etc/telemt)"); eprintln!(" --no-start Don't start the service after install"); std::process::exit(0); } "--version" | "-V" => { println!("telemt {}", env!("CARGO_PKG_VERSION")); std::process::exit(0); } s if !s.starts_with('-') => { config_path = s.to_string(); } other => { eprintln!("Unknown option: {}", other); } } i += 1; } (config_path, silent, log_level) } fn print_proxy_links(host: &str, port: u16, config: &ProxyConfig) { info!(target: "telemt::links", "--- Proxy Links ({}) ---", host); for user_name in config.general.links.show.resolve_users(&config.access.users) { if let Some(secret) = config.access.users.get(user_name) { info!(target: "telemt::links", "User: {}", user_name); if config.general.modes.classic { info!( target: "telemt::links", " Classic: tg://proxy?server={}&port={}&secret={}", host, port, secret ); } if config.general.modes.secure { info!( target: "telemt::links", " DD: tg://proxy?server={}&port={}&secret=dd{}", host, port, secret ); } if config.general.modes.tls { let mut domains = Vec::with_capacity(1 + config.censorship.tls_domains.len()); domains.push(config.censorship.tls_domain.clone()); for d in &config.censorship.tls_domains { if !domains.contains(d) { domains.push(d.clone()); } } for domain in domains { let domain_hex = hex::encode(&domain); info!( target: "telemt::links", " EE-TLS: tg://proxy?server={}&port={}&secret=ee{}{}", host, port, secret, domain_hex ); } } } else { warn!(target: "telemt::links", "User '{}' in show_link not found", user_name); } } info!(target: "telemt::links", "------------------------"); } async fn write_beobachten_snapshot(path: &str, payload: &str) -> std::io::Result<()> { if let Some(parent) = std::path::Path::new(path).parent() && !parent.as_os_str().is_empty() { tokio::fs::create_dir_all(parent).await?; } tokio::fs::write(path, payload).await } #[tokio::main] async fn main() -> std::result::Result<(), Box> { let (config_path, cli_silent, cli_log_level) = parse_cli(); let mut config = match ProxyConfig::load(&config_path) { Ok(c) => c, Err(e) => { if std::path::Path::new(&config_path).exists() { eprintln!("[telemt] Error: {}", e); std::process::exit(1); } else { let default = ProxyConfig::default(); std::fs::write(&config_path, toml::to_string_pretty(&default).unwrap()).unwrap(); eprintln!("[telemt] Created default config at {}", config_path); default } } }; if let Err(e) = config.validate() { eprintln!("[telemt] Invalid config: {}", e); std::process::exit(1); } if let Err(e) = crate::network::dns_overrides::install_entries(&config.network.dns_overrides) { eprintln!("[telemt] Invalid network.dns_overrides: {}", e); std::process::exit(1); } let has_rust_log = std::env::var("RUST_LOG").is_ok(); let effective_log_level = if cli_silent { LogLevel::Silent } else if let Some(ref s) = cli_log_level { LogLevel::from_str_loose(s) } else { config.general.log_level.clone() }; let (filter_layer, filter_handle) = reload::Layer::new(EnvFilter::new("info")); // Configure color output based on config let fmt_layer = if config.general.disable_colors { fmt::Layer::default().with_ansi(false) } else { fmt::Layer::default().with_ansi(true) }; tracing_subscriber::registry() .with(filter_layer) .with(fmt_layer) .init(); info!("Telemt MTProxy v{}", env!("CARGO_PKG_VERSION")); info!("Log level: {}", effective_log_level); if config.general.disable_colors { info!("Colors: disabled"); } info!( "Modes: classic={} secure={} tls={}", config.general.modes.classic, config.general.modes.secure, config.general.modes.tls ); if config.general.modes.classic { warn!("Classic mode is vulnerable to DPI detection; enable only for legacy clients"); } info!("TLS domain: {}", config.censorship.tls_domain); if let Some(ref sock) = config.censorship.mask_unix_sock { info!("Mask: {} -> unix:{}", config.censorship.mask, sock); if !std::path::Path::new(sock).exists() { warn!( "Unix socket '{}' does not exist yet. Masking will fail until it appears.", sock ); } } else { info!( "Mask: {} -> {}:{}", config.censorship.mask, config .censorship .mask_host .as_deref() .unwrap_or(&config.censorship.tls_domain), config.censorship.mask_port ); } if config.censorship.tls_domain == "www.google.com" { warn!("Using default tls_domain. Consider setting a custom domain."); } let upstream_manager = Arc::new(UpstreamManager::new( config.upstreams.clone(), config.general.upstream_connect_retry_attempts, config.general.upstream_connect_retry_backoff_ms, config.general.upstream_unhealthy_fail_threshold, )); let mut tls_domains = Vec::with_capacity(1 + config.censorship.tls_domains.len()); tls_domains.push(config.censorship.tls_domain.clone()); for d in &config.censorship.tls_domains { if !tls_domains.contains(d) { tls_domains.push(d.clone()); } } // Start TLS front fetching in background immediately, in parallel with STUN probing. let tls_cache: Option> = if config.censorship.tls_emulation { let cache = Arc::new(TlsFrontCache::new( &tls_domains, config.censorship.fake_cert_len, &config.censorship.tls_front_dir, )); cache.load_from_disk().await; let port = config.censorship.mask_port; let proxy_protocol = config.censorship.mask_proxy_protocol; let mask_host = config .censorship .mask_host .clone() .unwrap_or_else(|| config.censorship.tls_domain.clone()); let mask_unix_sock = config.censorship.mask_unix_sock.clone(); let fetch_timeout = Duration::from_secs(5); let cache_initial = cache.clone(); let domains_initial = tls_domains.clone(); let host_initial = mask_host.clone(); let unix_sock_initial = mask_unix_sock.clone(); let upstream_initial = upstream_manager.clone(); tokio::spawn(async move { let mut join = tokio::task::JoinSet::new(); for domain in domains_initial { let cache_domain = cache_initial.clone(); let host_domain = host_initial.clone(); let unix_sock_domain = unix_sock_initial.clone(); let upstream_domain = upstream_initial.clone(); join.spawn(async move { match crate::tls_front::fetcher::fetch_real_tls( &host_domain, port, &domain, fetch_timeout, Some(upstream_domain), proxy_protocol, unix_sock_domain.as_deref(), ) .await { Ok(res) => cache_domain.update_from_fetch(&domain, res).await, Err(e) => { warn!(domain = %domain, error = %e, "TLS emulation initial fetch failed") } } }); } while let Some(res) = join.join_next().await { if let Err(e) = res { warn!(error = %e, "TLS emulation initial fetch task join failed"); } } }); let cache_timeout = cache.clone(); let domains_timeout = tls_domains.clone(); let fake_cert_len = config.censorship.fake_cert_len; tokio::spawn(async move { tokio::time::sleep(fetch_timeout).await; for domain in domains_timeout { let cached = cache_timeout.get(&domain).await; if cached.domain == "default" { warn!( domain = %domain, timeout_secs = fetch_timeout.as_secs(), fake_cert_len, "TLS-front fetch not ready within timeout; using cache/default fake cert fallback" ); } } }); // Periodic refresh with jitter. let cache_refresh = cache.clone(); let domains_refresh = tls_domains.clone(); let host_refresh = mask_host.clone(); let unix_sock_refresh = mask_unix_sock.clone(); let upstream_refresh = upstream_manager.clone(); tokio::spawn(async move { loop { let base_secs = rand::rng().random_range(4 * 3600..=6 * 3600); let jitter_secs = rand::rng().random_range(0..=7200); tokio::time::sleep(Duration::from_secs(base_secs + jitter_secs)).await; let mut join = tokio::task::JoinSet::new(); for domain in domains_refresh.clone() { let cache_domain = cache_refresh.clone(); let host_domain = host_refresh.clone(); let unix_sock_domain = unix_sock_refresh.clone(); let upstream_domain = upstream_refresh.clone(); join.spawn(async move { match crate::tls_front::fetcher::fetch_real_tls( &host_domain, port, &domain, fetch_timeout, Some(upstream_domain), proxy_protocol, unix_sock_domain.as_deref(), ) .await { Ok(res) => cache_domain.update_from_fetch(&domain, res).await, Err(e) => warn!(domain = %domain, error = %e, "TLS emulation refresh failed"), } }); } while let Some(res) = join.join_next().await { if let Err(e) = res { warn!(error = %e, "TLS emulation refresh task join failed"); } } } }); Some(cache) } else { None }; let probe = run_probe( &config.network, config.general.middle_proxy_nat_probe, config.general.stun_nat_probe_concurrency, ) .await?; let decision = decide_network_capabilities(&config.network, &probe); log_probe_result(&probe, &decision); let prefer_ipv6 = decision.prefer_ipv6(); let mut use_middle_proxy = config.general.use_middle_proxy && (decision.ipv4_me || decision.ipv6_me); let stats = Arc::new(Stats::new()); stats.apply_telemetry_policy(TelemetryPolicy::from_config(&config.general.telemetry)); let beobachten = Arc::new(BeobachtenStore::new()); let rng = Arc::new(SecureRandom::new()); // IP Tracker initialization let ip_tracker = Arc::new(UserIpTracker::new()); ip_tracker.load_limits(&config.access.user_max_unique_ips).await; if !config.access.user_max_unique_ips.is_empty() { info!("IP limits configured for {} users", config.access.user_max_unique_ips.len()); } if !config.network.dns_overrides.is_empty() { info!( "Runtime DNS overrides configured: {} entries", config.network.dns_overrides.len() ); } // Connection concurrency limit let max_connections = Arc::new(Semaphore::new(10_000)); if use_middle_proxy && !decision.ipv4_me && !decision.ipv6_me { warn!("No usable IP family for Middle Proxy detected; falling back to direct DC"); use_middle_proxy = false; } // ===================================================================== // Middle Proxy initialization (if enabled) // ===================================================================== let me_pool: Option> = if use_middle_proxy { info!("=== Middle Proxy Mode ==="); let me_nat_probe = config.general.middle_proxy_nat_probe && config.network.stun_use; if config.general.middle_proxy_nat_probe && !config.network.stun_use { info!("Middle-proxy STUN probing disabled by network.stun_use=false"); } // Global ad_tag (pool default). Used when user has no per-user tag in access.user_ad_tags. let proxy_tag = config .general .ad_tag .as_ref() .map(|tag| hex::decode(tag).expect("general.ad_tag must be validated before startup")); // ============================================================= // CRITICAL: Download Telegram proxy-secret (NOT user secret!) // // C MTProxy uses TWO separate secrets: // -S flag = 16-byte user secret for client obfuscation // --aes-pwd = 32-512 byte binary file for ME RPC auth // // proxy-secret is from: https://core.telegram.org/getProxySecret // ============================================================= let proxy_secret_path = config.general.proxy_secret_path.as_deref(); match crate::transport::middle_proxy::fetch_proxy_secret( proxy_secret_path, config.general.proxy_secret_len_max, ) .await { Ok(proxy_secret) => { info!( secret_len = proxy_secret.len(), key_sig = format_args!( "0x{:08x}", if proxy_secret.len() >= 4 { u32::from_le_bytes([ proxy_secret[0], proxy_secret[1], proxy_secret[2], proxy_secret[3], ]) } else { 0 } ), "Proxy-secret loaded" ); // Load ME config (v4/v6) + default DC let mut cfg_v4 = fetch_proxy_config( "https://core.telegram.org/getProxyConfig", ) .await .unwrap_or_default(); let mut cfg_v6 = fetch_proxy_config( "https://core.telegram.org/getProxyConfigV6", ) .await .unwrap_or_default(); if cfg_v4.map.is_empty() { cfg_v4.map = crate::protocol::constants::TG_MIDDLE_PROXIES_V4.clone(); } if cfg_v6.map.is_empty() { cfg_v6.map = crate::protocol::constants::TG_MIDDLE_PROXIES_V6.clone(); } let pool = MePool::new( proxy_tag, proxy_secret, config.general.middle_proxy_nat_ip, me_nat_probe, None, config.network.stun_servers.clone(), config.general.stun_nat_probe_concurrency, probe.detected_ipv6, config.timeouts.me_one_retry, config.timeouts.me_one_timeout_ms, cfg_v4.map.clone(), cfg_v6.map.clone(), cfg_v4.default_dc.or(cfg_v6.default_dc), decision.clone(), Some(upstream_manager.clone()), rng.clone(), stats.clone(), config.general.me_keepalive_enabled, config.general.me_keepalive_interval_secs, config.general.me_keepalive_jitter_secs, config.general.me_keepalive_payload_random, config.general.me_warmup_stagger_enabled, config.general.me_warmup_step_delay_ms, config.general.me_warmup_step_jitter_ms, config.general.me_reconnect_max_concurrent_per_dc, config.general.me_reconnect_backoff_base_ms, config.general.me_reconnect_backoff_cap_ms, config.general.me_reconnect_fast_retry_count, config.general.hardswap, config.general.me_pool_drain_ttl_secs, config.general.effective_me_pool_force_close_secs(), config.general.me_pool_min_fresh_ratio, config.general.me_hardswap_warmup_delay_min_ms, config.general.me_hardswap_warmup_delay_max_ms, config.general.me_hardswap_warmup_extra_passes, config.general.me_hardswap_warmup_pass_backoff_base_ms, config.general.me_socks_kdf_policy, config.general.me_route_backpressure_base_timeout_ms, config.general.me_route_backpressure_high_timeout_ms, config.general.me_route_backpressure_high_watermark_pct, ); let pool_size = config.general.middle_proxy_pool_size.max(1); loop { match pool.init(pool_size, &rng).await { Ok(()) => { info!("Middle-End pool initialized successfully"); // Phase 4: Start health monitor let pool_clone = pool.clone(); let rng_clone = rng.clone(); let min_conns = pool_size; tokio::spawn(async move { crate::transport::middle_proxy::me_health_monitor( pool_clone, rng_clone, min_conns, ) .await; }); break Some(pool); } Err(e) => { warn!( error = %e, retry_in_secs = 2, "ME pool is not ready yet; retrying startup initialization" ); pool.reset_stun_state(); tokio::time::sleep(Duration::from_secs(2)).await; } } } } Err(e) => { error!(error = %e, "Failed to fetch proxy-secret. Falling back to direct mode."); None } } } else { None }; // If ME failed to initialize, force direct-only mode. if me_pool.is_some() { info!("Transport: Middle-End Proxy - all DC-over-RPC"); } else { let _ = use_middle_proxy; use_middle_proxy = false; // Make runtime config reflect direct-only mode for handlers. config.general.use_middle_proxy = false; info!("Transport: Direct DC - TCP - standard DC-over-TCP"); } // Freeze config after possible fallback decision let config = Arc::new(config); let replay_checker = Arc::new(ReplayChecker::new( config.access.replay_check_len, Duration::from_secs(config.access.replay_window_secs), )); let buffer_pool = Arc::new(BufferPool::with_config(16 * 1024, 4096)); // Middle-End ping before DC connectivity if let Some(ref pool) = me_pool { let me_results = run_me_ping(pool, &rng).await; let v4_ok = me_results.iter().any(|r| { matches!(r.family, MePingFamily::V4) && r.samples.iter().any(|s| s.error.is_none() && s.handshake_ms.is_some()) }); let v6_ok = me_results.iter().any(|r| { matches!(r.family, MePingFamily::V6) && r.samples.iter().any(|s| s.error.is_none() && s.handshake_ms.is_some()) }); info!("================= Telegram ME Connectivity ================="); if v4_ok && v6_ok { info!(" IPv4 and IPv6 available"); } else if v4_ok { info!(" IPv4 only / IPv6 unavailable"); } else if v6_ok { info!(" IPv6 only / IPv4 unavailable"); } else { info!(" No ME connectivity"); } let me_route = format_me_route( &config.upstreams, &me_results, prefer_ipv6, v4_ok, v6_ok, ) .await; info!(" via {}", me_route); info!("============================================================"); use std::collections::BTreeMap; let mut grouped: BTreeMap> = BTreeMap::new(); for report in me_results { for s in report.samples { let key = s.dc.abs(); grouped.entry(key).or_default().push(s); } } let family_order = if prefer_ipv6 { vec![(MePingFamily::V6, true), (MePingFamily::V6, false), (MePingFamily::V4, true), (MePingFamily::V4, false)] } else { vec![(MePingFamily::V4, true), (MePingFamily::V4, false), (MePingFamily::V6, true), (MePingFamily::V6, false)] }; for (dc_abs, samples) in grouped { for (family, is_pos) in &family_order { let fam_samples: Vec<&MePingSample> = samples .iter() .filter(|s| matches!(s.family, f if &f == family) && (s.dc >= 0) == *is_pos) .collect(); if fam_samples.is_empty() { continue; } let fam_label = match family { MePingFamily::V4 => "IPv4", MePingFamily::V6 => "IPv6", }; info!(" DC{} [{}]", dc_abs, fam_label); for sample in fam_samples { let line = format_sample_line(sample); info!("{}", line); } } } info!("============================================================"); } info!("================= Telegram DC Connectivity ================="); let ping_results = upstream_manager .ping_all_dcs( prefer_ipv6, &config.dc_overrides, decision.ipv4_dc, decision.ipv6_dc, ) .await; for upstream_result in &ping_results { let v6_works = upstream_result .v6_results .iter() .any(|r| r.rtt_ms.is_some()); let v4_works = upstream_result .v4_results .iter() .any(|r| r.rtt_ms.is_some()); if upstream_result.both_available { if prefer_ipv6 { info!(" IPv6 in use / IPv4 is fallback"); } else { info!(" IPv4 in use / IPv6 is fallback"); } } else if v6_works && !v4_works { info!(" IPv6 only / IPv4 unavailable"); } else if v4_works && !v6_works { info!(" IPv4 only / IPv6 unavailable"); } else if !v6_works && !v4_works { info!(" No DC connectivity"); } info!(" via {}", upstream_result.upstream_name); info!("============================================================"); // Print IPv6 results first (only if IPv6 is available) if v6_works { for dc in &upstream_result.v6_results { let addr_str = format!("{}:{}", dc.dc_addr.ip(), dc.dc_addr.port()); match &dc.rtt_ms { Some(rtt) => { info!(" DC{} [IPv6] {} - {:.0} ms", dc.dc_idx, addr_str, rtt); } None => { let err = dc.error.as_deref().unwrap_or("fail"); info!(" DC{} [IPv6] {} - FAIL ({})", dc.dc_idx, addr_str, err); } } } info!("============================================================"); } // Print IPv4 results (only if IPv4 is available) if v4_works { for dc in &upstream_result.v4_results { let addr_str = format!("{}:{}", dc.dc_addr.ip(), dc.dc_addr.port()); match &dc.rtt_ms { Some(rtt) => { info!( " DC{} [IPv4] {}\t\t\t\t{:.0} ms", dc.dc_idx, addr_str, rtt ); } None => { let err = dc.error.as_deref().unwrap_or("fail"); info!( " DC{} [IPv4] {}:\t\t\t\tFAIL ({})", dc.dc_idx, addr_str, err ); } } } info!("============================================================"); } } // Background tasks let um_clone = upstream_manager.clone(); let decision_clone = decision.clone(); let dc_overrides_for_health = config.dc_overrides.clone(); tokio::spawn(async move { um_clone .run_health_checks( prefer_ipv6, decision_clone.ipv4_dc, decision_clone.ipv6_dc, dc_overrides_for_health, ) .await; }); let rc_clone = replay_checker.clone(); tokio::spawn(async move { rc_clone.run_periodic_cleanup().await; }); let detected_ip_v4: Option = probe.detected_ipv4.map(std::net::IpAddr::V4); let detected_ip_v6: Option = probe.detected_ipv6.map(std::net::IpAddr::V6); debug!( "Detected IPs: v4={:?} v6={:?}", detected_ip_v4, detected_ip_v6 ); // ── Hot-reload watcher ──────────────────────────────────────────────── // Uses inotify to detect file changes instantly (SIGHUP also works). // detected_ip_v4/v6 are passed so newly added users get correct TG links. let (config_rx, mut log_level_rx): ( tokio::sync::watch::Receiver>, tokio::sync::watch::Receiver, ) = spawn_config_watcher( std::path::PathBuf::from(&config_path), config.clone(), detected_ip_v4, detected_ip_v6, ); let stats_policy = stats.clone(); let mut config_rx_policy = config_rx.clone(); let me_pool_policy = me_pool.clone(); tokio::spawn(async move { loop { if config_rx_policy.changed().await.is_err() { break; } let cfg = config_rx_policy.borrow_and_update().clone(); stats_policy.apply_telemetry_policy(TelemetryPolicy::from_config(&cfg.general.telemetry)); if let Some(pool) = &me_pool_policy { pool.update_runtime_transport_policy( cfg.general.me_socks_kdf_policy, cfg.general.me_route_backpressure_base_timeout_ms, cfg.general.me_route_backpressure_high_timeout_ms, cfg.general.me_route_backpressure_high_watermark_pct, ); } } }); let beobachten_writer = beobachten.clone(); let config_rx_beobachten = config_rx.clone(); tokio::spawn(async move { loop { let cfg = config_rx_beobachten.borrow().clone(); let sleep_secs = cfg.general.beobachten_flush_secs.max(1); if cfg.general.beobachten { let ttl = Duration::from_secs(cfg.general.beobachten_minutes.saturating_mul(60)); let path = cfg.general.beobachten_file.clone(); let snapshot = beobachten_writer.snapshot_text(ttl); if let Err(e) = write_beobachten_snapshot(&path, &snapshot).await { warn!(error = %e, path = %path, "Failed to flush beobachten snapshot"); } } tokio::time::sleep(Duration::from_secs(sleep_secs)).await; } }); if let Some(ref pool) = me_pool { let pool_clone = pool.clone(); let rng_clone = rng.clone(); let config_rx_clone = config_rx.clone(); tokio::spawn(async move { crate::transport::middle_proxy::me_config_updater( pool_clone, rng_clone, config_rx_clone, ) .await; }); let pool_clone_rot = pool.clone(); let rng_clone_rot = rng.clone(); let config_rx_clone_rot = config_rx.clone(); tokio::spawn(async move { crate::transport::middle_proxy::me_rotation_task( pool_clone_rot, rng_clone_rot, config_rx_clone_rot, ) .await; }); } let mut listeners = Vec::new(); for listener_conf in &config.server.listeners { let addr = SocketAddr::new(listener_conf.ip, config.server.port); if addr.is_ipv4() && !decision.ipv4_dc { warn!(%addr, "Skipping IPv4 listener: IPv4 disabled by [network]"); continue; } if addr.is_ipv6() && !decision.ipv6_dc { warn!(%addr, "Skipping IPv6 listener: IPv6 disabled by [network]"); continue; } let options = ListenOptions { reuse_port: listener_conf.reuse_allow, ipv6_only: listener_conf.ip.is_ipv6(), ..Default::default() }; match create_listener(addr, &options) { Ok(socket) => { let listener = TcpListener::from_std(socket.into())?; info!("Listening on {}", addr); let listener_proxy_protocol = listener_conf.proxy_protocol.unwrap_or(config.server.proxy_protocol); // Resolve the public host for link generation let public_host = if let Some(ref announce) = listener_conf.announce { announce.clone() // Use announce (IP or hostname) if explicitly set } else if listener_conf.ip.is_unspecified() { // Auto-detect for unspecified addresses if listener_conf.ip.is_ipv4() { detected_ip_v4 .map(|ip| ip.to_string()) .unwrap_or_else(|| listener_conf.ip.to_string()) } else { detected_ip_v6 .map(|ip| ip.to_string()) .unwrap_or_else(|| listener_conf.ip.to_string()) } } else { listener_conf.ip.to_string() }; // Show per-listener proxy links only when public_host is not set if config.general.links.public_host.is_none() && !config.general.links.show.is_empty() { let link_port = config.general.links.public_port.unwrap_or(config.server.port); print_proxy_links(&public_host, link_port, &config); } listeners.push((listener, listener_proxy_protocol)); } Err(e) => { if e.kind() == std::io::ErrorKind::AddrInUse { let owners = find_listener_processes(addr); if owners.is_empty() { error!( %addr, "Failed to bind: address already in use (owner process unresolved)" ); } else { for owner in owners { error!( %addr, pid = owner.pid, process = %owner.process, "Failed to bind: address already in use" ); } } if !listener_conf.reuse_allow { error!( %addr, "reuse_allow=false; set [[server.listeners]].reuse_allow=true to allow multi-instance listening" ); } } else { error!("Failed to bind to {}: {}", addr, e); } } } } // Show proxy links once when public_host is set, OR when there are no TCP listeners // (unix-only mode) — use detected IP as fallback if !config.general.links.show.is_empty() && (config.general.links.public_host.is_some() || listeners.is_empty()) { let (host, port) = if let Some(ref h) = config.general.links.public_host { (h.clone(), config.general.links.public_port.unwrap_or(config.server.port)) } else { let ip = detected_ip_v4 .or(detected_ip_v6) .map(|ip| ip.to_string()); if ip.is_none() { warn!("show_link is configured but public IP could not be detected. Set public_host in config."); } (ip.unwrap_or_else(|| "UNKNOWN".to_string()), config.general.links.public_port.unwrap_or(config.server.port)) }; print_proxy_links(&host, port, &config); } // Unix socket setup (before listeners check so unix-only config works) let mut has_unix_listener = false; #[cfg(unix)] if let Some(ref unix_path) = config.server.listen_unix_sock { // Remove stale socket file if present (standard practice) let _ = tokio::fs::remove_file(unix_path).await; let unix_listener = UnixListener::bind(unix_path)?; // Apply socket permissions if configured if let Some(ref perm_str) = config.server.listen_unix_sock_perm { match u32::from_str_radix(perm_str.trim_start_matches('0'), 8) { Ok(mode) => { use std::os::unix::fs::PermissionsExt; let perms = std::fs::Permissions::from_mode(mode); if let Err(e) = std::fs::set_permissions(unix_path, perms) { error!("Failed to set unix socket permissions to {}: {}", perm_str, e); } else { info!("Listening on unix:{} (mode {})", unix_path, perm_str); } } Err(e) => { warn!("Invalid listen_unix_sock_perm '{}': {}. Ignoring.", perm_str, e); info!("Listening on unix:{}", unix_path); } } } else { info!("Listening on unix:{}", unix_path); } has_unix_listener = true; let mut config_rx_unix: tokio::sync::watch::Receiver> = config_rx.clone(); let stats = stats.clone(); let upstream_manager = upstream_manager.clone(); let replay_checker = replay_checker.clone(); let buffer_pool = buffer_pool.clone(); let rng = rng.clone(); let me_pool = me_pool.clone(); let tls_cache = tls_cache.clone(); let ip_tracker = ip_tracker.clone(); let beobachten = beobachten.clone(); let max_connections_unix = max_connections.clone(); tokio::spawn(async move { let unix_conn_counter = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(1)); loop { match unix_listener.accept().await { Ok((stream, _)) => { let permit = match max_connections_unix.clone().acquire_owned().await { Ok(permit) => permit, Err(_) => { error!("Connection limiter is closed"); break; } }; let conn_id = unix_conn_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let fake_peer = SocketAddr::from(([127, 0, 0, 1], (conn_id % 65535) as u16)); let config = config_rx_unix.borrow_and_update().clone(); let stats = stats.clone(); let upstream_manager = upstream_manager.clone(); let replay_checker = replay_checker.clone(); let buffer_pool = buffer_pool.clone(); let rng = rng.clone(); let me_pool = me_pool.clone(); let tls_cache = tls_cache.clone(); let ip_tracker = ip_tracker.clone(); let beobachten = beobachten.clone(); let proxy_protocol_enabled = config.server.proxy_protocol; tokio::spawn(async move { let _permit = permit; if let Err(e) = crate::proxy::client::handle_client_stream( stream, fake_peer, config, stats, upstream_manager, replay_checker, buffer_pool, rng, me_pool, tls_cache, ip_tracker, beobachten, proxy_protocol_enabled, ).await { debug!(error = %e, "Unix socket connection error"); } }); } Err(e) => { error!("Unix socket accept error: {}", e); tokio::time::sleep(Duration::from_millis(100)).await; } } } }); } if listeners.is_empty() && !has_unix_listener { error!("No listeners. Exiting."); std::process::exit(1); } // Switch to user-configured log level after startup let runtime_filter = if has_rust_log { EnvFilter::from_default_env() } else if matches!(effective_log_level, LogLevel::Silent) { EnvFilter::new("warn,telemt::links=info") } else { EnvFilter::new(effective_log_level.to_filter_str()) }; filter_handle .reload(runtime_filter) .expect("Failed to switch log filter"); // Apply log_level changes from hot-reload to the tracing filter. tokio::spawn(async move { loop { if log_level_rx.changed().await.is_err() { break; } let level = log_level_rx.borrow_and_update().clone(); let new_filter = tracing_subscriber::EnvFilter::new(level.to_filter_str()); if let Err(e) = filter_handle.reload(new_filter) { tracing::error!("config reload: failed to update log filter: {}", e); } } }); if let Some(port) = config.server.metrics_port { let stats = stats.clone(); let beobachten = beobachten.clone(); let config_rx_metrics = config_rx.clone(); let ip_tracker_metrics = ip_tracker.clone(); let whitelist = config.server.metrics_whitelist.clone(); tokio::spawn(async move { metrics::serve( port, stats, beobachten, ip_tracker_metrics, config_rx_metrics, whitelist, ) .await; }); } for (listener, listener_proxy_protocol) in listeners { let mut config_rx: tokio::sync::watch::Receiver> = config_rx.clone(); let stats = stats.clone(); let upstream_manager = upstream_manager.clone(); let replay_checker = replay_checker.clone(); let buffer_pool = buffer_pool.clone(); let rng = rng.clone(); let me_pool = me_pool.clone(); let tls_cache = tls_cache.clone(); let ip_tracker = ip_tracker.clone(); let beobachten = beobachten.clone(); let max_connections_tcp = max_connections.clone(); tokio::spawn(async move { loop { match listener.accept().await { Ok((stream, peer_addr)) => { let permit = match max_connections_tcp.clone().acquire_owned().await { Ok(permit) => permit, Err(_) => { error!("Connection limiter is closed"); break; } }; let config = config_rx.borrow_and_update().clone(); let stats = stats.clone(); let upstream_manager = upstream_manager.clone(); let replay_checker = replay_checker.clone(); let buffer_pool = buffer_pool.clone(); let rng = rng.clone(); let me_pool = me_pool.clone(); let tls_cache = tls_cache.clone(); let ip_tracker = ip_tracker.clone(); let beobachten = beobachten.clone(); let proxy_protocol_enabled = listener_proxy_protocol; tokio::spawn(async move { let _permit = permit; if let Err(e) = ClientHandler::new( stream, peer_addr, config, stats, upstream_manager, replay_checker, buffer_pool, rng, me_pool, tls_cache, ip_tracker, beobachten, proxy_protocol_enabled, ) .run() .await { let peer_closed = matches!( &e, crate::error::ProxyError::Io(ioe) if matches!( ioe.kind(), std::io::ErrorKind::ConnectionReset | std::io::ErrorKind::ConnectionAborted | std::io::ErrorKind::BrokenPipe | std::io::ErrorKind::NotConnected ) ) || matches!( &e, crate::error::ProxyError::Stream( crate::error::StreamError::Io(ioe) ) if matches!( ioe.kind(), std::io::ErrorKind::ConnectionReset | std::io::ErrorKind::ConnectionAborted | std::io::ErrorKind::BrokenPipe | std::io::ErrorKind::NotConnected ) ); let me_closed = matches!( &e, crate::error::ProxyError::Proxy(msg) if msg == "ME connection lost" ); match (peer_closed, me_closed) { (true, _) => debug!(peer = %peer_addr, error = %e, "Connection closed by client"), (_, true) => warn!(peer = %peer_addr, error = %e, "Connection closed: Middle-End dropped session"), _ => warn!(peer = %peer_addr, error = %e, "Connection closed with error"), } } }); } Err(e) => { error!("Accept error: {}", e); tokio::time::sleep(Duration::from_millis(100)).await; } } } }); } match signal::ctrl_c().await { Ok(()) => info!("Shutting down..."), Err(e) => error!("Signal error: {}", e), } Ok(()) }