diff --git a/docs/Architecture/API/API.md b/docs/Architecture/API/API.md index 589724d..f037fc7 100644 --- a/docs/Architecture/API/API.md +++ b/docs/Architecture/API/API.md @@ -307,7 +307,7 @@ An empty request body is accepted and generates a new secret automatically. | `route_mode` | `string` | Current route mode label from route runtime controller. | | `reroute_active` | `bool` | `true` when ME fallback currently routes new sessions to Direct-DC. | | `reroute_to_direct_at_epoch_secs` | `u64?` | Unix timestamp when current direct reroute began. | -| `reroute_reason` | `string?` | `fast_not_ready_fallback` or `strict_grace_fallback` while reroute is active. | +| `reroute_reason` | `string?` | `startup_direct_fallback`, `fast_not_ready_fallback`, or `strict_grace_fallback` while reroute is active. | | `startup_status` | `string` | Startup status (`pending`, `initializing`, `ready`, `failed`, `skipped`). | | `startup_stage` | `string` | Current startup stage identifier. | | `startup_progress_pct` | `f64` | Startup progress percentage (`0..100`). | @@ -1286,12 +1286,12 @@ Additional runtime endpoint behavior: ## ME Fallback Behavior Exposed Via API When `general.use_middle_proxy=true` and `general.me2dc_fallback=true`: -- Startup does not block on full ME pool readiness; initialization can continue in background. +- Startup opens Direct-DC routing first, then initializes ME in background and switches new sessions to Middle mode after ME readiness is observed. - Runtime initialization payload can expose ME stage `background_init` until pool becomes ready. - Admission/routing decision uses two readiness grace windows for "ME not ready" periods: - `80s` before first-ever readiness is observed (startup grace), + direct startup fallback before first-ever readiness is observed, `6s` after readiness has been observed at least once (runtime failover timeout). -- While in fallback window breach, new sessions are routed via Direct-DC; when ME becomes ready, routing returns to Middle mode for new sessions. +- While fallback is active, new sessions are routed via Direct-DC; when ME becomes ready, routing returns to Middle mode. Direct sessions affected by the cutover are closed with the existing staggered delay so clients reconnect through the current route. ## Serialization Rules diff --git a/docs/Config_params/CONFIG_PARAMS.en.md b/docs/Config_params/CONFIG_PARAMS.en.md index 8cd193e..61d305a 100644 --- a/docs/Config_params/CONFIG_PARAMS.en.md +++ b/docs/Config_params/CONFIG_PARAMS.en.md @@ -98,7 +98,7 @@ This document lists all configuration keys accepted by `config.toml`. | [`middle_proxy_warm_standby`](#middle_proxy_warm_standby) | `usize` | `16` | | [`me_init_retry_attempts`](#me_init_retry_attempts) | `u32` | `0` | | [`me2dc_fallback`](#me2dc_fallback) | `bool` | `true` | -| [`me2dc_fast`](#me2dc_fast) | `bool` | `false` | +| [`me2dc_fast`](#me2dc_fast) | `bool` | `true` | | [`me_keepalive_enabled`](#me_keepalive_enabled) | `bool` | `true` | | [`me_keepalive_interval_secs`](#me_keepalive_interval_secs) | `u64` | `8` | | [`me_keepalive_jitter_secs`](#me_keepalive_jitter_secs) | `u64` | `2` | @@ -392,7 +392,7 @@ This document lists all configuration keys accepted by `config.toml`. ``` ## me2dc_fallback - **Constraints / validation**: `bool`. - - **Description**: Allows fallback from ME mode to direct DC when ME startup fails. + - **Description**: Allows Direct-DC fallback when ME is unavailable. With `use_middle_proxy = true`, startup opens Direct-DC routing first and moves new sessions to ME after ME readiness is observed. - **Example**: ```toml @@ -401,14 +401,14 @@ This document lists all configuration keys accepted by `config.toml`. ``` ## me2dc_fast - **Constraints / validation**: `bool`. Active only when `use_middle_proxy = true` and `me2dc_fallback = true`. - - **Description**: Fast ME->Direct fallback mode for new sessions. + - **Description**: Fast ME->Direct fallback mode for new sessions after ME was ready at least once. Initial direct-first startup fallback is controlled by `me2dc_fallback`. - **Example**: ```toml [general] use_middle_proxy = true me2dc_fallback = true - me2dc_fast = false + me2dc_fast = true ``` ## me_keepalive_enabled - **Constraints / validation**: `bool`. @@ -2352,6 +2352,7 @@ Note: This section also accepts the legacy alias `[server.admin_api]` (same sche | [`mask`](#mask) | `bool` | `true` | | [`mask_host`](#mask_host) | `String` | — | | [`mask_port`](#mask_port) | `u16` | `443` | +| [`exclusive_mask`](#exclusive_mask) | `Map` | `{}` | | [`mask_unix_sock`](#mask_unix_sock) | `String` | — | | [`fake_cert_len`](#fake_cert_len) | `usize` | `2048` | | [`tls_emulation`](#tls_emulation) | `bool` | `true` | @@ -2459,6 +2460,18 @@ Note: This section also accepts the legacy alias `[server.admin_api]` (same sche [censorship] mask_port = 443 ``` +## exclusive_mask + - **Constraints / validation**: TOML map. Keys must be SNI domain names. Values must be `host:port` with `port > 0`; IPv6 literals must be bracketed. + - **Description**: Per-SNI TCP mask targets for fallback traffic. When a TLS ClientHello SNI matches a key, Telemt relays that unauthenticated connection to the mapped target. Other fallback traffic keeps using the existing `mask_host`/`mask_port` or SNI-aware default masking behavior. + - **Example**: + + ```toml + [censorship] + tls_domains = ["petrovich.ru", "bsi.bund.de", "telekom.com"] + + [censorship.exclusive_mask] + "bsi.bund.de" = "127.0.0.1:443" + ``` ## mask_unix_sock - **Constraints / validation**: `String` (optional). - Must not be empty when set. diff --git a/docs/Config_params/CONFIG_PARAMS.ru.md b/docs/Config_params/CONFIG_PARAMS.ru.md index 5006046..637e742 100644 --- a/docs/Config_params/CONFIG_PARAMS.ru.md +++ b/docs/Config_params/CONFIG_PARAMS.ru.md @@ -98,7 +98,7 @@ | [`middle_proxy_warm_standby`](#middle_proxy_warm_standby) | `usize` | `16` | | [`me_init_retry_attempts`](#me_init_retry_attempts) | `u32` | `0` | | [`me2dc_fallback`](#me2dc_fallback) | `bool` | `true` | -| [`me2dc_fast`](#me2dc_fast) | `bool` | `false` | +| [`me2dc_fast`](#me2dc_fast) | `bool` | `true` | | [`me_keepalive_enabled`](#me_keepalive_enabled) | `bool` | `true` | | [`me_keepalive_interval_secs`](#me_keepalive_interval_secs) | `u64` | `8` | | [`me_keepalive_jitter_secs`](#me_keepalive_jitter_secs) | `u64` | `2` | @@ -392,7 +392,7 @@ ``` ## me2dc_fallback - **Ограничения / валидация**: `bool`. - - **Описание**: Перейти из режима ME в режим прямого соединения (DC) в случае сбоя запуска ME. + - **Описание**: Разрешает fallback на прямой DC, когда ME недоступен. При `use_middle_proxy = true` запуск сначала открывает маршрутизацию через Direct-DC, а новые сеансы переводятся на ME после подтверждения готовности ME. - **Пример**: ```toml @@ -401,14 +401,14 @@ ``` ## me2dc_fast - **Ограничения / валидация**: `bool`. Используется только, когда `use_middle_proxy = true` и `me2dc_fallback = true`. - - **Описание**: Режим для быстрого перехода между режимами ME->DC для новых сеансов. + - **Описание**: Быстрый fallback ME->Direct для новых сеансов после того, как ME уже был готов хотя бы один раз. Начальный direct-first fallback управляется `me2dc_fallback`. - **Пример**: ```toml [general] use_middle_proxy = true me2dc_fallback = true - me2dc_fast = false + me2dc_fast = true ``` ## me_keepalive_enabled - **Ограничения / валидация**: `bool`. @@ -2358,6 +2358,7 @@ | [`mask`](#mask) | `bool` | `true` | | [`mask_host`](#mask_host) | `String` | — | | [`mask_port`](#mask_port) | `u16` | `443` | +| [`exclusive_mask`](#exclusive_mask) | `Map` | `{}` | | [`mask_unix_sock`](#mask_unix_sock) | `String` | — | | [`fake_cert_len`](#fake_cert_len) | `usize` | `2048` | | [`tls_emulation`](#tls_emulation) | `bool` | `true` | @@ -2464,6 +2465,18 @@ [censorship] mask_port = 443 ``` +## exclusive_mask + - **Ограничения / валидация**: TOML map. Ключи должны быть доменами SNI. Значения должны иметь формат `host:port`, где `port > 0`; IPv6 literals должны быть в квадратных скобках. + - **Описание**: Per-SNI TCP targets для fallback-трафика. Если SNI в TLS ClientHello совпадает с ключом, Telemt проксирует это неаутентифицированное соединение на указанный target. Остальной fallback-трафик продолжает использовать существующий `mask_host`/`mask_port` или SNI-aware default masking behavior. + - **Пример**: + + ```toml + [censorship] + tls_domains = ["petrovich.ru", "bsi.bund.de", "telekom.com"] + + [censorship.exclusive_mask] + "bsi.bund.de" = "127.0.0.1:443" + ``` ## mask_unix_sock - **Ограничения / валидация**: `String` (optional). - Значение не должно быть пустым, если задан. diff --git a/src/api/runtime_zero.rs b/src/api/runtime_zero.rs index d54c50f..b17615f 100644 --- a/src/api/runtime_zero.rs +++ b/src/api/runtime_zero.rs @@ -178,6 +178,7 @@ pub(super) async fn build_runtime_gates_data( cfg: &ProxyConfig, ) -> RuntimeGatesData { let startup_summary = build_runtime_startup_summary(shared).await; + let startup_snapshot = shared.startup_tracker.snapshot().await; let route_state = shared.route_runtime.snapshot(); let route_mode = route_state.mode.as_str(); let fast_fallback_enabled = @@ -191,7 +192,9 @@ pub(super) async fn build_runtime_gates_data( None }; let reroute_reason = if reroute_active { - if fast_fallback_enabled { + if startup_snapshot.me.status.as_str() != "ready" { + Some("startup_direct_fallback") + } else if fast_fallback_enabled { Some("fast_not_ready_fallback") } else { Some("strict_grace_fallback") diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index eb84ccd..8135d31 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -617,6 +617,7 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b || old.censorship.mask != new.censorship.mask || old.censorship.mask_host != new.censorship.mask_host || old.censorship.mask_port != new.censorship.mask_port + || old.censorship.exclusive_mask != new.censorship.exclusive_mask || old.censorship.mask_unix_sock != new.censorship.mask_unix_sock || old.censorship.fake_cert_len != new.censorship.fake_cert_len || old.censorship.tls_emulation != new.censorship.tls_emulation diff --git a/src/config/load.rs b/src/config/load.rs index 5a9b38c..6e40623 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -31,6 +31,30 @@ fn is_valid_tls_domain_name(domain: &str) -> bool { .any(|ch| ch.is_whitespace() || matches!(ch, '/' | '\\')) } +fn parse_exclusive_mask_target(target: &str) -> Option<(&str, u16)> { + let target = target.trim(); + if target.is_empty() { + return None; + } + + if target.starts_with('[') { + let end = target.find(']')?; + if target.get(end + 1..end + 2)? != ":" { + return None; + } + let host = &target[..=end]; + let port = target[end + 2..].parse::().ok()?; + return (port > 0).then_some((host, port)); + } + + let (host, port) = target.rsplit_once(':')?; + if host.is_empty() || host.contains(':') { + return None; + } + let port = port.parse::().ok()?; + (port > 0).then_some((host, port)) +} + const TOP_LEVEL_CONFIG_KEYS: &[&str] = &[ "general", "network", @@ -291,6 +315,7 @@ const CENSORSHIP_CONFIG_KEYS: &[&str] = &[ "mask", "mask_host", "mask_port", + "exclusive_mask", "mask_unix_sock", "fake_cert_len", "tls_emulation", @@ -1923,6 +1948,21 @@ impl ProxyConfig { config.censorship.mask_host = Some(config.censorship.tls_domain.clone()); } + for (domain, target) in &config.censorship.exclusive_mask { + if !is_valid_tls_domain_name(domain) { + return Err(ProxyError::Config(format!( + "Invalid censorship.exclusive_mask domain: '{}'. Must be a valid domain name", + domain + ))); + } + if parse_exclusive_mask_target(target).is_none() { + return Err(ProxyError::Config(format!( + "Invalid censorship.exclusive_mask target for '{}': '{}'. Expected host:port with port > 0", + domain, target + ))); + } + } + // Normalize optional TLS fetch scope: whitespace-only values disable scoped routing. config.censorship.tls_fetch_scope = config.censorship.tls_fetch_scope.trim().to_string(); @@ -2126,6 +2166,21 @@ impl ProxyConfig { } } + for (domain, target) in &self.censorship.exclusive_mask { + if !is_valid_tls_domain_name(domain) { + return Err(ProxyError::Config(format!( + "Invalid censorship.exclusive_mask domain: '{}'. Must be a valid domain name", + domain + ))); + } + if parse_exclusive_mask_target(target).is_none() { + return Err(ProxyError::Config(format!( + "Invalid censorship.exclusive_mask target for '{}': '{}'. Expected host:port with port > 0", + domain, target + ))); + } + } + for (user, tag) in &self.access.user_ad_tags { let zeros = "00000000000000000000000000000000"; if !is_valid_ad_tag(tag) { @@ -2667,6 +2722,32 @@ mod tests { ); } + #[test] + fn exclusive_mask_parses_domain_target_map() { + let cfg = load_config_from_temp_toml( + r#" + [general] + [network] + [server] + [access] + [censorship] + tls_domain = "example.com" + [censorship.exclusive_mask] + "my-site.com" = "127.0.0.1:8443" + "ipv6.example" = "[::1]:9443" + "#, + ); + + assert_eq!( + cfg.censorship.exclusive_mask.get("my-site.com"), + Some(&"127.0.0.1:8443".to_string()) + ); + assert_eq!( + cfg.censorship.exclusive_mask.get("ipv6.example"), + Some(&"[::1]:9443".to_string()) + ); + } + #[test] fn api_gray_action_parses_and_defaults_to_drop() { let cfg_default: ProxyConfig = toml::from_str( diff --git a/src/config/types.rs b/src/config/types.rs index d20dd7e..e118cf4 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -1719,6 +1719,10 @@ pub struct AntiCensorshipConfig { #[serde(default = "default_mask_port")] pub mask_port: u16, + /// Per-SNI TCP mask targets. Keys are SNI domains, values are `host:port`. + #[serde(default)] + pub exclusive_mask: HashMap, + #[serde(default)] pub mask_unix_sock: Option, @@ -1842,6 +1846,7 @@ impl Default for AntiCensorshipConfig { mask: default_true(), mask_host: None, mask_port: default_mask_port(), + exclusive_mask: HashMap::new(), mask_unix_sock: None, fake_cert_len: default_fake_cert_len(), tls_emulation: true, diff --git a/src/maestro/admission.rs b/src/maestro/admission.rs index e9b1cbc..fb25c45 100644 --- a/src/maestro/admission.rs +++ b/src/maestro/admission.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; -use tokio::sync::watch; +use tokio::sync::{RwLock, watch}; use tracing::{info, warn}; use crate::config::ProxyConfig; @@ -14,24 +14,32 @@ const RUNTIME_FALLBACK_AFTER: Duration = Duration::from_secs(6); pub(crate) async fn configure_admission_gate( config: &Arc, me_pool: Option>, + me_pool_runtime: Arc>>>, route_runtime: Arc, admission_tx: &watch::Sender, config_rx: watch::Receiver>, me_ready_rx: watch::Receiver, ) { if config.general.use_middle_proxy { - if let Some(pool) = me_pool.as_ref() { - let initial_ready = pool.admission_ready_conditional_cast().await; + if me_pool.is_some() || config.general.me2dc_fallback { + let initial_pool = match me_pool.as_ref() { + Some(pool) => Some(pool.clone()), + None => me_pool_runtime.read().await.clone(), + }; + let initial_ready = match initial_pool.as_ref() { + Some(pool) => pool.admission_ready_conditional_cast().await, + None => false, + }; let mut fallback_enabled = config.general.me2dc_fallback; let mut fast_fallback_enabled = fallback_enabled && config.general.me2dc_fast; let (initial_gate_open, initial_route_mode, initial_fallback_reason) = if initial_ready { (true, RelayRouteMode::Middle, None) - } else if fast_fallback_enabled { + } else if fallback_enabled { ( true, RelayRouteMode::Direct, - Some("fast_not_ready_fallback"), + Some("startup_direct_fallback"), ) } else { (false, RelayRouteMode::Middle, None) @@ -49,7 +57,8 @@ pub(crate) async fn configure_admission_gate( warn!("Conditional-admission gate: closed / ME pool is NOT ready)"); } - let pool_for_gate = pool.clone(); + let mut pool_for_gate = initial_pool; + let pool_runtime_for_gate = me_pool_runtime.clone(); let admission_tx_gate = admission_tx.clone(); let route_runtime_gate = route_runtime.clone(); let mut config_rx_gate = config_rx.clone(); @@ -83,12 +92,27 @@ pub(crate) async fn configure_admission_gate( } _ = tokio::time::sleep(Duration::from_millis(admission_poll_ms)) => {} } - let ready = pool_for_gate.admission_ready_conditional_cast().await; + if pool_for_gate.is_none() { + pool_for_gate = pool_runtime_for_gate.read().await.clone(); + } + let ready = match pool_for_gate.as_ref() { + Some(pool) => pool.admission_ready_conditional_cast().await, + None => false, + }; let now = Instant::now(); let (next_gate_open, next_route_mode, next_fallback_reason) = if ready { ready_observed = true; not_ready_since = None; + if let Some(pool) = pool_for_gate.as_ref() { + pool.set_runtime_ready(true); + } (true, RelayRouteMode::Middle, None) + } else if fallback_enabled && !ready_observed { + ( + true, + RelayRouteMode::Direct, + Some("startup_direct_fallback"), + ) } else if fast_fallback_enabled { ( true, @@ -122,7 +146,14 @@ pub(crate) async fn configure_admission_gate( ); } else { let fallback_reason = next_fallback_reason.unwrap_or("unknown"); - if fallback_reason == "strict_grace_fallback" { + if fallback_reason == "startup_direct_fallback" { + warn!( + target_mode = route_mode.as_str(), + cutover_generation = snapshot.generation, + fallback_reason, + "ME pool not-ready during startup; routing new sessions via Direct-DC" + ); + } else if fallback_reason == "strict_grace_fallback" { let fallback_after = if ready_observed { RUNTIME_FALLBACK_AFTER } else { diff --git a/src/maestro/listeners.rs b/src/maestro/listeners.rs index 0247937..d47e1a4 100644 --- a/src/maestro/listeners.rs +++ b/src/maestro/listeners.rs @@ -6,7 +6,7 @@ use std::time::Duration; use tokio::net::TcpListener; #[cfg(unix)] use tokio::net::UnixListener; -use tokio::sync::{Semaphore, watch}; +use tokio::sync::{RwLock, Semaphore, watch}; use tracing::{debug, error, info, warn}; use crate::config::{ProxyConfig, RstOnCloseMode}; @@ -63,6 +63,7 @@ pub(crate) async fn bind_listeners( buffer_pool: Arc, rng: Arc, me_pool: Option>, + me_pool_runtime: Arc>>>, route_runtime: Arc, tls_cache: Option>, ip_tracker: Arc, @@ -236,6 +237,7 @@ pub(crate) async fn bind_listeners( let buffer_pool = buffer_pool.clone(); let rng = rng.clone(); let me_pool = me_pool.clone(); + let me_pool_runtime = me_pool_runtime.clone(); let route_runtime = route_runtime.clone(); let tls_cache = tls_cache.clone(); let ip_tracker = ip_tracker.clone(); @@ -298,6 +300,7 @@ pub(crate) async fn bind_listeners( let buffer_pool = buffer_pool.clone(); let rng = rng.clone(); let me_pool = me_pool.clone(); + let me_pool_runtime = me_pool_runtime.clone(); let route_runtime = route_runtime.clone(); let tls_cache = tls_cache.clone(); let ip_tracker = ip_tracker.clone(); @@ -307,7 +310,8 @@ pub(crate) async fn bind_listeners( tokio::spawn(async move { let _permit = permit; - if let Err(e) = crate::proxy::client::handle_client_stream_with_shared( + if let Err(e) = + crate::proxy::client::handle_client_stream_with_shared_and_pool_runtime( stream, fake_peer, config, @@ -317,6 +321,7 @@ pub(crate) async fn bind_listeners( buffer_pool, rng, me_pool, + Some(me_pool_runtime), route_runtime, tls_cache, ip_tracker, @@ -367,6 +372,7 @@ pub(crate) fn spawn_tcp_accept_loops( buffer_pool: Arc, rng: Arc, me_pool: Option>, + me_pool_runtime: Arc>>>, route_runtime: Arc, tls_cache: Option>, ip_tracker: Arc, @@ -383,6 +389,7 @@ pub(crate) fn spawn_tcp_accept_loops( let buffer_pool = buffer_pool.clone(); let rng = rng.clone(); let me_pool = me_pool.clone(); + let me_pool_runtime = me_pool_runtime.clone(); let route_runtime = route_runtime.clone(); let tls_cache = tls_cache.clone(); let ip_tracker = ip_tracker.clone(); @@ -449,6 +456,7 @@ pub(crate) fn spawn_tcp_accept_loops( let buffer_pool = buffer_pool.clone(); let rng = rng.clone(); let me_pool = me_pool.clone(); + let me_pool_runtime = me_pool_runtime.clone(); let route_runtime = route_runtime.clone(); let tls_cache = tls_cache.clone(); let ip_tracker = ip_tracker.clone(); @@ -470,6 +478,7 @@ pub(crate) fn spawn_tcp_accept_loops( buffer_pool, rng, me_pool, + Some(me_pool_runtime), route_runtime, tls_cache, ip_tracker, diff --git a/src/maestro/mod.rs b/src/maestro/mod.rs index 7baebd8..3a957e5 100644 --- a/src/maestro/mod.rs +++ b/src/maestro/mod.rs @@ -36,10 +36,10 @@ use crate::network::probe::{decide_network_capabilities, log_probe_result, run_p use crate::proxy::route_mode::{RelayRouteMode, RouteRuntimeController}; use crate::proxy::shared_state::ProxySharedState; use crate::startup::{ - COMPONENT_API_BOOTSTRAP, COMPONENT_CONFIG_LOAD, COMPONENT_ME_POOL_CONSTRUCT, - COMPONENT_ME_POOL_INIT_STAGE1, COMPONENT_ME_PROXY_CONFIG_V4, COMPONENT_ME_PROXY_CONFIG_V6, - COMPONENT_ME_SECRET_FETCH, COMPONENT_NETWORK_PROBE, COMPONENT_TRACING_INIT, StartupMeStatus, - StartupTracker, + COMPONENT_API_BOOTSTRAP, COMPONENT_CONFIG_LOAD, COMPONENT_DC_CONNECTIVITY_PING, + COMPONENT_ME_CONNECTIVITY_PING, COMPONENT_ME_POOL_CONSTRUCT, COMPONENT_ME_POOL_INIT_STAGE1, + COMPONENT_ME_PROXY_CONFIG_V4, COMPONENT_ME_PROXY_CONFIG_V6, COMPONENT_ME_SECRET_FETCH, + COMPONENT_NETWORK_PROBE, COMPONENT_TRACING_INIT, StartupMeStatus, StartupTracker, }; use crate::stats::beobachten::BeobachtenStore; use crate::stats::telemetry::TelemetryPolicy; @@ -461,12 +461,14 @@ async fn run_telemt_core( let (api_config_tx, api_config_rx) = watch::channel(Arc::new(config.clone())); let (detected_ips_tx, detected_ips_rx) = watch::channel((None::, None::)); - let initial_admission_open = !config.general.use_middle_proxy; + let initial_direct_first = + config.general.use_middle_proxy && config.general.me2dc_fallback; + let initial_admission_open = !config.general.use_middle_proxy || initial_direct_first; let (admission_tx, admission_rx) = watch::channel(initial_admission_open); - let initial_route_mode = if config.general.use_middle_proxy { - RelayRouteMode::Middle - } else { + let initial_route_mode = if !config.general.use_middle_proxy || initial_direct_first { RelayRouteMode::Direct + } else { + RelayRouteMode::Middle }; let route_runtime = Arc::new(RouteRuntimeController::new(initial_route_mode)); let api_me_pool = Arc::new(RwLock::new(None::>)); @@ -602,8 +604,9 @@ async fn run_telemt_core( let me_init_retry_attempts = config.general.me_init_retry_attempts; if use_middle_proxy && !decision.ipv4_me && !decision.ipv6_me { if me2dc_fallback { - warn!("No usable IP family for Middle Proxy detected; falling back to direct DC"); - use_middle_proxy = false; + warn!( + "No usable IP family for Middle Proxy detected; Direct-DC startup fallback is active while ME init retries continue" + ); } else { warn!( "No usable IP family for Middle Proxy detected; me2dc_fallback=false, ME init retries stay active" @@ -665,23 +668,32 @@ async fn run_telemt_core( } let (me_ready_tx, me_ready_rx) = watch::channel(0_u64); + let direct_first_startup = use_middle_proxy && me2dc_fallback; - let me_pool: Option> = me_startup::initialize_me_pool( - use_middle_proxy, - &config, - &decision, - &probe, - &startup_tracker, - upstream_manager.clone(), - rng.clone(), - stats.clone(), - api_me_pool.clone(), - me_ready_tx.clone(), - ) - .await; + let me_pool: Option> = if direct_first_startup { + None + } else { + me_startup::initialize_me_pool( + use_middle_proxy, + &config, + &decision, + &probe, + &startup_tracker, + upstream_manager.clone(), + rng.clone(), + stats.clone(), + api_me_pool.clone(), + me_ready_tx.clone(), + ) + .await + }; // If ME failed to initialize, force direct-only mode. - if me_pool.is_some() { + if direct_first_startup { + startup_tracker.set_transport_mode("direct").await; + startup_tracker.set_degraded(true).await; + info!("Transport: Direct DC startup fallback active; Middle-End bootstrap continues in background"); + } else if me_pool.is_some() { startup_tracker.set_transport_mode("middle_proxy").await; startup_tracker.set_degraded(false).await; info!("Transport: Middle-End Proxy - all DC-over-RPC"); @@ -719,18 +731,33 @@ async fn run_telemt_core( config.access.cidr_rate_limits.clone(), ); - connectivity::run_startup_connectivity( - &config, - &me_pool, - rng.clone(), - &startup_tracker, - upstream_manager.clone(), - prefer_ipv6, - &decision, - process_started_at, - api_me_pool.clone(), - ) - .await; + if direct_first_startup { + startup_tracker + .skip_component( + COMPONENT_ME_CONNECTIVITY_PING, + Some("deferred by direct-first startup".to_string()), + ) + .await; + startup_tracker + .skip_component( + COMPONENT_DC_CONNECTIVITY_PING, + Some("background health checks active".to_string()), + ) + .await; + } else { + connectivity::run_startup_connectivity( + &config, + &me_pool, + rng.clone(), + &startup_tracker, + upstream_manager.clone(), + prefer_ipv6, + &decision, + process_started_at, + api_me_pool.clone(), + ) + .await; + } let runtime_watches = runtime_tasks::spawn_runtime_tasks( &config, @@ -758,9 +785,70 @@ async fn run_telemt_core( let detected_ip_v4 = runtime_watches.detected_ip_v4; let detected_ip_v6 = runtime_watches.detected_ip_v6; + if direct_first_startup { + let config_bg = config.clone(); + let decision_bg = decision.clone(); + let probe_bg = probe.clone(); + let startup_tracker_bg = startup_tracker.clone(); + let upstream_manager_bg = upstream_manager.clone(); + let rng_bg = rng.clone(); + let stats_bg = stats.clone(); + let api_me_pool_bg = api_me_pool.clone(); + let me_ready_tx_bg = me_ready_tx.clone(); + let config_rx_bg = config_rx.clone(); + tokio::spawn(async move { + let mut bootstrap_attempt: u32 = 0; + loop { + bootstrap_attempt = bootstrap_attempt.saturating_add(1); + let pool = me_startup::initialize_me_pool( + true, + config_bg.as_ref(), + &decision_bg, + &probe_bg, + &startup_tracker_bg, + upstream_manager_bg.clone(), + rng_bg.clone(), + stats_bg.clone(), + api_me_pool_bg.clone(), + me_ready_tx_bg.clone(), + ) + .await; + if let Some(pool) = pool { + runtime_tasks::spawn_middle_proxy_runtime_tasks( + config_bg.as_ref(), + config_rx_bg, + pool, + rng_bg, + me_ready_tx_bg, + ); + break; + } + if me_init_retry_attempts > 0 && bootstrap_attempt >= me_init_retry_attempts { + break; + } + tokio::time::sleep(Duration::from_secs(2)).await; + } + }); + + let startup_tracker_ready = startup_tracker.clone(); + let api_me_pool_ready = api_me_pool.clone(); + let mut me_ready_rx_transport = me_ready_tx.subscribe(); + tokio::spawn(async move { + if me_ready_rx_transport.changed().await.is_ok() { + if let Some(pool) = api_me_pool_ready.read().await.as_ref() { + pool.set_runtime_ready(true); + } + startup_tracker_ready.set_transport_mode("middle_proxy").await; + startup_tracker_ready.set_degraded(false).await; + info!("Transport: Middle-End Proxy restored for new sessions"); + } + }); + } + admission::configure_admission_gate( &config, me_pool.clone(), + api_me_pool.clone(), route_runtime.clone(), &admission_tx, config_rx.clone(), @@ -789,6 +877,7 @@ async fn run_telemt_core( buffer_pool.clone(), rng.clone(), me_pool.clone(), + api_me_pool.clone(), route_runtime.clone(), tls_cache.clone(), ip_tracker.clone(), @@ -843,6 +932,7 @@ async fn run_telemt_core( buffer_pool.clone(), rng.clone(), me_pool.clone(), + api_me_pool.clone(), route_runtime.clone(), tls_cache.clone(), ip_tracker.clone(), diff --git a/src/maestro/runtime_tasks.rs b/src/maestro/runtime_tasks.rs index 7125b04..930acda 100644 --- a/src/maestro/runtime_tasks.rs +++ b/src/maestro/runtime_tasks.rs @@ -257,45 +257,7 @@ pub(crate) async fn spawn_runtime_tasks( }); if let Some(pool) = me_pool { - let reinit_trigger_capacity = config.general.me_reinit_trigger_channel.max(1); - let (reinit_tx, reinit_rx) = mpsc::channel::(reinit_trigger_capacity); - - let pool_clone_sched = pool.clone(); - let rng_clone_sched = rng.clone(); - let config_rx_clone_sched = config_rx.clone(); - let me_ready_tx_sched = me_ready_tx.clone(); - tokio::spawn(async move { - crate::transport::middle_proxy::me_reinit_scheduler( - pool_clone_sched, - rng_clone_sched, - config_rx_clone_sched, - reinit_rx, - me_ready_tx_sched, - ) - .await; - }); - - let pool_clone = pool.clone(); - let config_rx_clone = config_rx.clone(); - let reinit_tx_updater = reinit_tx.clone(); - tokio::spawn(async move { - crate::transport::middle_proxy::me_config_updater( - pool_clone, - config_rx_clone, - reinit_tx_updater, - ) - .await; - }); - - let config_rx_clone_rot = config_rx.clone(); - let reinit_tx_rotation = reinit_tx.clone(); - tokio::spawn(async move { - crate::transport::middle_proxy::me_rotation_task( - config_rx_clone_rot, - reinit_tx_rotation, - ) - .await; - }); + spawn_middle_proxy_runtime_tasks(config, config_rx.clone(), pool, rng, me_ready_tx); } RuntimeWatches { @@ -306,6 +268,51 @@ pub(crate) async fn spawn_runtime_tasks( } } +pub(crate) fn spawn_middle_proxy_runtime_tasks( + config: &ProxyConfig, + config_rx: watch::Receiver>, + pool: Arc, + rng: Arc, + me_ready_tx: watch::Sender, +) { + let reinit_trigger_capacity = config.general.me_reinit_trigger_channel.max(1); + let (reinit_tx, reinit_rx) = mpsc::channel::(reinit_trigger_capacity); + + let pool_clone_sched = pool.clone(); + let rng_clone_sched = rng.clone(); + let config_rx_clone_sched = config_rx.clone(); + let me_ready_tx_sched = me_ready_tx.clone(); + tokio::spawn(async move { + crate::transport::middle_proxy::me_reinit_scheduler( + pool_clone_sched, + rng_clone_sched, + config_rx_clone_sched, + reinit_rx, + me_ready_tx_sched, + ) + .await; + }); + + let pool_clone = pool.clone(); + let config_rx_clone = config_rx.clone(); + let reinit_tx_updater = reinit_tx.clone(); + tokio::spawn(async move { + crate::transport::middle_proxy::me_config_updater( + pool_clone, + config_rx_clone, + reinit_tx_updater, + ) + .await; + }); + + let config_rx_clone_rot = config_rx.clone(); + let reinit_tx_rotation = reinit_tx.clone(); + tokio::spawn(async move { + crate::transport::middle_proxy::me_rotation_task(config_rx_clone_rot, reinit_tx_rotation) + .await; + }); +} + pub(crate) async fn apply_runtime_log_filter( has_rust_log: bool, effective_log_level: &LogLevel, diff --git a/src/proxy/client.rs b/src/proxy/client.rs index 136efc8..a8357b7 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -11,6 +11,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite}; use tokio::net::TcpStream; +use tokio::sync::RwLock; use tokio::time::timeout; use tracing::{debug, warn}; @@ -452,7 +453,50 @@ where } #[allow(clippy::too_many_arguments)] +#[allow(dead_code)] pub async fn handle_client_stream_with_shared( + stream: S, + peer: SocketAddr, + config: Arc, + stats: Arc, + upstream_manager: Arc, + replay_checker: Arc, + buffer_pool: Arc, + rng: Arc, + me_pool: Option>, + route_runtime: Arc, + tls_cache: Option>, + ip_tracker: Arc, + beobachten: Arc, + shared: Arc, + proxy_protocol_enabled: bool, +) -> Result<()> +where + S: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ + handle_client_stream_with_shared_and_pool_runtime( + stream, + peer, + config, + stats, + upstream_manager, + replay_checker, + buffer_pool, + rng, + me_pool, + None, + route_runtime, + tls_cache, + ip_tracker, + beobachten, + shared, + proxy_protocol_enabled, + ) + .await +} + +#[allow(clippy::too_many_arguments)] +pub async fn handle_client_stream_with_shared_and_pool_runtime( mut stream: S, peer: SocketAddr, config: Arc, @@ -462,6 +506,7 @@ pub async fn handle_client_stream_with_shared( buffer_pool: Arc, rng: Arc, me_pool: Option>, + me_pool_runtime: Option>>>>, route_runtime: Arc, tls_cache: Option>, ip_tracker: Arc, @@ -731,6 +776,7 @@ where RunningClientHandler::handle_authenticated_static_with_shared( crypto_reader, crypto_writer, success, upstream_manager, stats, config, buffer_pool, rng, me_pool, + me_pool_runtime, route_runtime.clone(), local_addr, real_peer, ip_tracker.clone(), shared.clone(), @@ -791,6 +837,7 @@ where buffer_pool, rng, me_pool, + me_pool_runtime, route_runtime.clone(), local_addr, real_peer, @@ -846,6 +893,7 @@ pub struct RunningClientHandler { buffer_pool: Arc, rng: Arc, me_pool: Option>, + me_pool_runtime: Option>>>>, route_runtime: Arc, tls_cache: Option>, ip_tracker: Arc, @@ -891,6 +939,7 @@ impl ClientHandler { buffer_pool, rng, me_pool, + None, route_runtime, tls_cache, ip_tracker, @@ -915,6 +964,7 @@ impl ClientHandler { buffer_pool: Arc, rng: Arc, me_pool: Option>, + me_pool_runtime: Option>>>>, route_runtime: Arc, tls_cache: Option>, ip_tracker: Arc, @@ -938,6 +988,7 @@ impl ClientHandler { buffer_pool, rng, me_pool, + me_pool_runtime, route_runtime, tls_cache, ip_tracker, @@ -1345,6 +1396,7 @@ impl RunningClientHandler { buffer_pool, self.rng, self.me_pool, + self.me_pool_runtime, self.route_runtime.clone(), local_addr, peer, @@ -1429,6 +1481,7 @@ impl RunningClientHandler { buffer_pool, self.rng, self.me_pool, + self.me_pool_runtime, self.route_runtime.clone(), local_addr, peer, @@ -1472,6 +1525,7 @@ impl RunningClientHandler { buffer_pool, rng, me_pool, + None, route_runtime, local_addr, peer_addr, @@ -1491,6 +1545,7 @@ impl RunningClientHandler { buffer_pool: Arc, rng: Arc, me_pool: Option>, + me_pool_runtime: Option>>>>, route_runtime: Arc, local_addr: SocketAddr, peer_addr: SocketAddr, @@ -1521,15 +1576,29 @@ impl RunningClientHandler { let route_snapshot = route_runtime.snapshot(); let session_id = rng.u64(); - let relay_result = if config.general.use_middle_proxy + let selected_me_pool = if config.general.use_middle_proxy && matches!(route_snapshot.mode, RelayRouteMode::Middle) { if let Some(ref pool) = me_pool { + Some(pool.clone()) + } else if let Some(pool_runtime) = me_pool_runtime.as_ref() { + pool_runtime.read().await.clone() + } else { + None + } + } else { + None + }; + + let relay_result = if config.general.use_middle_proxy + && matches!(route_snapshot.mode, RelayRouteMode::Middle) + { + if let Some(pool) = selected_me_pool { handle_via_middle_proxy( client_reader, client_writer, success, - pool.clone(), + pool, stats.clone(), config, buffer_pool, diff --git a/src/proxy/masking.rs b/src/proxy/masking.rs index b2a8e84..3cd8799 100644 --- a/src/proxy/masking.rs +++ b/src/proxy/masking.rs @@ -47,6 +47,12 @@ struct CopyOutcome { ended_by_eof: bool, } +#[derive(Clone, Copy)] +struct MaskTcpTarget<'a> { + host: &'a str, + port: u16, +} + async fn copy_with_idle_timeout( reader: &mut R, writer: &mut W, @@ -331,7 +337,9 @@ async fn wait_mask_outcome_budget(started: Instant, config: &ProxyConfig) { #[cfg(test)] mod tls_domain_mask_host_tests { - use super::{mask_host_for_initial_data, matching_tls_domain_for_sni}; + use super::{ + mask_host_for_initial_data, mask_tcp_target_for_initial_data, matching_tls_domain_for_sni, + }; use crate::config::ProxyConfig; fn client_hello_with_sni(sni_host: &str) -> Vec { @@ -410,6 +418,25 @@ mod tls_domain_mask_host_tests { assert_eq!(mask_host_for_initial_data(&config, &initial_data), "b.com"); } + + #[test] + fn exclusive_mask_target_overrides_only_matching_sni() { + let mut config = config_with_tls_domains(); + config + .censorship + .exclusive_mask + .insert("b.com".to_string(), "origin-b.example:8443".to_string()); + let b_initial_data = client_hello_with_sni("B.COM"); + let c_initial_data = client_hello_with_sni("c.com"); + + let b_target = mask_tcp_target_for_initial_data(&config, &b_initial_data); + let c_target = mask_tcp_target_for_initial_data(&config, &c_initial_data); + + assert_eq!(b_target.host, "origin-b.example"); + assert_eq!(b_target.port, 8443); + assert_eq!(c_target.host, "c.com"); + assert_eq!(c_target.port, config.censorship.mask_port); + } } /// Detect client type based on initial data @@ -458,7 +485,61 @@ fn matching_tls_domain_for_sni<'a>(config: &'a ProxyConfig, sni: &str) -> Option None } +fn parse_exclusive_mask_target(target: &str) -> Option> { + let target = target.trim(); + if target.is_empty() { + return None; + } + + if target.starts_with('[') { + let end = target.find(']')?; + if target.get(end + 1..end + 2)? != ":" { + return None; + } + let port = target[end + 2..].parse::().ok()?; + return (port > 0).then_some(MaskTcpTarget { + host: &target[..=end], + port, + }); + } + + let (host, port) = target.rsplit_once(':')?; + if host.is_empty() || host.contains(':') { + return None; + } + let port = port.parse::().ok()?; + (port > 0).then_some(MaskTcpTarget { host, port }) +} + +fn exclusive_mask_target_for_sni<'a>( + config: &'a ProxyConfig, + sni: &str, +) -> Option> { + for (domain, target) in &config.censorship.exclusive_mask { + if domain.eq_ignore_ascii_case(sni) { + return parse_exclusive_mask_target(target); + } + } + + None +} + +#[cfg(test)] fn mask_host_for_initial_data<'a>(config: &'a ProxyConfig, initial_data: &[u8]) -> &'a str { + mask_tcp_target_for_initial_data(config, initial_data).host +} + +fn mask_tcp_target_for_initial_data<'a>( + config: &'a ProxyConfig, + initial_data: &[u8], +) -> MaskTcpTarget<'a> { + if let Some(target) = tls::extract_sni_from_client_hello(initial_data) + .as_deref() + .and_then(|sni| exclusive_mask_target_for_sni(config, sni)) + { + return target; + } + let configured_mask_host = config .censorship .mask_host @@ -466,13 +547,20 @@ fn mask_host_for_initial_data<'a>(config: &'a ProxyConfig, initial_data: &[u8]) .unwrap_or(&config.censorship.tls_domain); if !configured_mask_host.eq_ignore_ascii_case(&config.censorship.tls_domain) { - return configured_mask_host; + return MaskTcpTarget { + host: configured_mask_host, + port: config.censorship.mask_port, + }; } - tls::extract_sni_from_client_hello(initial_data) + let host = tls::extract_sni_from_client_hello(initial_data) .as_deref() .and_then(|sni| matching_tls_domain_for_sni(config, sni)) - .unwrap_or(configured_mask_host) + .unwrap_or(configured_mask_host); + MaskTcpTarget { + host, + port: config.censorship.mask_port, + } } fn canonical_ip(ip: IpAddr) -> IpAddr { @@ -770,9 +858,15 @@ pub async fn handle_bad_client( return; } + let exclusive_tcp_target = tls::extract_sni_from_client_hello(initial_data) + .as_deref() + .and_then(|sni| exclusive_mask_target_for_sni(config, sni)); + // Connect via Unix socket or TCP #[cfg(unix)] - if let Some(ref sock_path) = config.censorship.mask_unix_sock { + if exclusive_tcp_target.is_none() + && let Some(ref sock_path) = config.censorship.mask_unix_sock + { let outcome_started = Instant::now(); let connect_started = Instant::now(); debug!( @@ -849,8 +943,10 @@ pub async fn handle_bad_client( return; } - let mask_host = mask_host_for_initial_data(config, initial_data); - let mask_port = config.censorship.mask_port; + let mask_target = exclusive_tcp_target + .unwrap_or_else(|| mask_tcp_target_for_initial_data(config, initial_data)); + let mask_host = mask_target.host; + let mask_port = mask_target.port; // Fail closed when fallback points at our own listener endpoint. // Self-referential masking can create recursive proxy loops under diff --git a/src/transport/middle_proxy/pool_init.rs b/src/transport/middle_proxy/pool_init.rs index 3f7cad7..3eee903 100644 --- a/src/transport/middle_proxy/pool_init.rs +++ b/src/transport/middle_proxy/pool_init.rs @@ -19,12 +19,14 @@ impl MePool { .me_reconnect_max_concurrent_per_dc .max(1) as usize; let ks = self.key_selector().await; + let me_servers = self.proxy_map_v4.read().await.len(); + let secret_len = self.proxy_secret.read().await.secret.len(); info!( - me_servers = self.proxy_map_v4.read().await.len(), + me_servers, pool_size, connect_concurrency, key_selector = format_args!("0x{ks:08x}"), - secret_len = self.proxy_secret.read().await.secret.len(), + secret_len, "Initializing ME pool" );