From 5c0eb6dbe8677bd077e5ea00e183711f5f283f34 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 20 Mar 2026 16:05:24 +0300 Subject: [PATCH 1/2] TLS Fetcher Upstream Selection Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/config/defaults.rs | 4 +++ src/config/hot_reload.rs | 1 + src/config/load.rs | 56 ++++++++++++++++++++++++++++++++++++ src/config/types.rs | 6 ++++ src/maestro/tls_bootstrap.rs | 8 ++++++ src/tls_front/fetcher.rs | 16 ++++++++--- 6 files changed, 87 insertions(+), 4 deletions(-) diff --git a/src/config/defaults.rs b/src/config/defaults.rs index fea8305..be540b0 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -65,6 +65,10 @@ pub(crate) fn default_tls_domain() -> String { "petrovich.ru".to_string() } +pub(crate) fn default_tls_fetch_scope() -> String { + String::new() +} + pub(crate) fn default_mask_port() -> u16 { 443 } diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index 1315f9c..4cf7676 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -623,6 +623,7 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b } if old.censorship.tls_domain != new.censorship.tls_domain || old.censorship.tls_domains != new.censorship.tls_domains + || old.censorship.tls_fetch_scope != new.censorship.tls_fetch_scope || old.censorship.mask != new.censorship.mask || old.censorship.mask_host != new.censorship.mask_host || old.censorship.mask_port != new.censorship.mask_port diff --git a/src/config/load.rs b/src/config/load.rs index 14799ed..fbd2b33 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -779,6 +779,9 @@ impl ProxyConfig { config.censorship.mask_host = Some(config.censorship.tls_domain.clone()); } + // Normalize optional TLS fetch scope: whitespace-only values disable scoped routing. + config.censorship.tls_fetch_scope = config.censorship.tls_fetch_scope.trim().to_string(); + // Merge primary + extra TLS domains, deduplicate (primary always first). if !config.censorship.tls_domains.is_empty() { let mut all = Vec::with_capacity(1 + config.censorship.tls_domains.len()); @@ -2097,6 +2100,59 @@ mod tests { let _ = std::fs::remove_file(path); } + #[test] + fn tls_fetch_scope_default_is_empty() { + let toml = r#" + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_tls_fetch_scope_default_test.toml"); + std::fs::write(&path, toml).unwrap(); + let cfg = ProxyConfig::load(&path).unwrap(); + assert!(cfg.censorship.tls_fetch_scope.is_empty()); + let _ = std::fs::remove_file(path); + } + + #[test] + fn tls_fetch_scope_is_trimmed_during_load() { + let toml = r#" + [censorship] + tls_domain = "example.com" + tls_fetch_scope = " me " + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_tls_fetch_scope_trim_test.toml"); + std::fs::write(&path, toml).unwrap(); + let cfg = ProxyConfig::load(&path).unwrap(); + assert_eq!(cfg.censorship.tls_fetch_scope, "me"); + let _ = std::fs::remove_file(path); + } + + #[test] + fn tls_fetch_scope_whitespace_becomes_empty() { + let toml = r#" + [censorship] + tls_domain = "example.com" + tls_fetch_scope = " " + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_tls_fetch_scope_blank_test.toml"); + std::fs::write(&path, toml).unwrap(); + let cfg = ProxyConfig::load(&path).unwrap(); + assert!(cfg.censorship.tls_fetch_scope.is_empty()); + let _ = std::fs::remove_file(path); + } + #[test] fn invalid_ad_tag_is_disabled_during_load() { let toml = r#" diff --git a/src/config/types.rs b/src/config/types.rs index d018187..c99000d 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -1308,6 +1308,11 @@ pub struct AntiCensorshipConfig { #[serde(default)] pub tls_domains: Vec, + /// Upstream scope used for TLS front metadata fetches. + /// Empty value keeps default upstream routing behavior. + #[serde(default = "default_tls_fetch_scope")] + pub tls_fetch_scope: String, + #[serde(default = "default_true")] pub mask: bool, @@ -1365,6 +1370,7 @@ impl Default for AntiCensorshipConfig { Self { tls_domain: default_tls_domain(), tls_domains: Vec::new(), + tls_fetch_scope: default_tls_fetch_scope(), mask: default_true(), mask_host: None, mask_port: default_mask_port(), diff --git a/src/maestro/tls_bootstrap.rs b/src/maestro/tls_bootstrap.rs index a0b0b5a..73eec4c 100644 --- a/src/maestro/tls_bootstrap.rs +++ b/src/maestro/tls_bootstrap.rs @@ -38,12 +38,15 @@ pub(crate) async fn bootstrap_tls_front( .clone() .unwrap_or_else(|| config.censorship.tls_domain.clone()); let mask_unix_sock = config.censorship.mask_unix_sock.clone(); + let tls_fetch_scope = (!config.censorship.tls_fetch_scope.is_empty()) + .then(|| config.censorship.tls_fetch_scope.clone()); let fetch_timeout = Duration::from_secs(5); let cache_initial = cache.clone(); let domains_initial = tls_domains.to_vec(); let host_initial = mask_host.clone(); let unix_sock_initial = mask_unix_sock.clone(); + let scope_initial = tls_fetch_scope.clone(); let upstream_initial = upstream_manager.clone(); tokio::spawn(async move { let mut join = tokio::task::JoinSet::new(); @@ -51,6 +54,7 @@ pub(crate) async fn bootstrap_tls_front( let cache_domain = cache_initial.clone(); let host_domain = host_initial.clone(); let unix_sock_domain = unix_sock_initial.clone(); + let scope_domain = scope_initial.clone(); let upstream_domain = upstream_initial.clone(); join.spawn(async move { match crate::tls_front::fetcher::fetch_real_tls( @@ -59,6 +63,7 @@ pub(crate) async fn bootstrap_tls_front( &domain, fetch_timeout, Some(upstream_domain), + scope_domain.as_deref(), proxy_protocol, unix_sock_domain.as_deref(), ) @@ -100,6 +105,7 @@ pub(crate) async fn bootstrap_tls_front( let domains_refresh = tls_domains.to_vec(); let host_refresh = mask_host.clone(); let unix_sock_refresh = mask_unix_sock.clone(); + let scope_refresh = tls_fetch_scope.clone(); let upstream_refresh = upstream_manager.clone(); tokio::spawn(async move { loop { @@ -112,6 +118,7 @@ pub(crate) async fn bootstrap_tls_front( let cache_domain = cache_refresh.clone(); let host_domain = host_refresh.clone(); let unix_sock_domain = unix_sock_refresh.clone(); + let scope_domain = scope_refresh.clone(); let upstream_domain = upstream_refresh.clone(); join.spawn(async move { match crate::tls_front::fetcher::fetch_real_tls( @@ -120,6 +127,7 @@ pub(crate) async fn bootstrap_tls_front( &domain, fetch_timeout, Some(upstream_domain), + scope_domain.as_deref(), proxy_protocol, unix_sock_domain.as_deref(), ) diff --git a/src/tls_front/fetcher.rs b/src/tls_front/fetcher.rs index 38872af..366e5d3 100644 --- a/src/tls_front/fetcher.rs +++ b/src/tls_front/fetcher.rs @@ -394,15 +394,17 @@ async fn connect_tcp_with_upstream( port: u16, connect_timeout: Duration, upstream: Option>, + scope: Option<&str>, ) -> Result { if let Some(manager) = upstream { if let Some(addr) = resolve_socket_addr(host, port) { - match manager.connect(addr, None, None).await { + match manager.connect(addr, None, scope).await { Ok(stream) => return Ok(stream), Err(e) => { warn!( host = %host, port = port, + scope = ?scope, error = %e, "Upstream connect failed, using direct connect" ); @@ -410,12 +412,13 @@ async fn connect_tcp_with_upstream( } } else if let Ok(mut addrs) = tokio::net::lookup_host((host, port)).await { if let Some(addr) = addrs.find(|a| a.is_ipv4()) { - match manager.connect(addr, None, None).await { + match manager.connect(addr, None, scope).await { Ok(stream) => return Ok(stream), Err(e) => { warn!( host = %host, port = port, + scope = ?scope, error = %e, "Upstream connect failed, using direct connect" ); @@ -537,6 +540,7 @@ async fn fetch_via_raw_tls( sni: &str, connect_timeout: Duration, upstream: Option>, + scope: Option<&str>, proxy_protocol: u8, unix_sock: Option<&str>, ) -> Result { @@ -572,7 +576,7 @@ async fn fetch_via_raw_tls( #[cfg(not(unix))] let _ = unix_sock; - let stream = connect_tcp_with_upstream(host, port, connect_timeout, upstream).await?; + let stream = connect_tcp_with_upstream(host, port, connect_timeout, upstream, scope).await?; fetch_via_raw_tls_stream(stream, sni, connect_timeout, proxy_protocol).await } @@ -675,6 +679,7 @@ async fn fetch_via_rustls( sni: &str, connect_timeout: Duration, upstream: Option>, + scope: Option<&str>, proxy_protocol: u8, unix_sock: Option<&str>, ) -> Result { @@ -710,7 +715,7 @@ async fn fetch_via_rustls( #[cfg(not(unix))] let _ = unix_sock; - let stream = connect_tcp_with_upstream(host, port, connect_timeout, upstream).await?; + let stream = connect_tcp_with_upstream(host, port, connect_timeout, upstream, scope).await?; fetch_via_rustls_stream(stream, host, sni, proxy_protocol).await } @@ -726,6 +731,7 @@ pub async fn fetch_real_tls( sni: &str, connect_timeout: Duration, upstream: Option>, + scope: Option<&str>, proxy_protocol: u8, unix_sock: Option<&str>, ) -> Result { @@ -735,6 +741,7 @@ pub async fn fetch_real_tls( sni, connect_timeout, upstream.clone(), + scope, proxy_protocol, unix_sock, ) @@ -753,6 +760,7 @@ pub async fn fetch_real_tls( sni, connect_timeout, upstream, + scope, proxy_protocol, unix_sock, ) From 269ba537ad893457c3ca11855a3661210b596c3a Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 20 Mar 2026 16:07:12 +0300 Subject: [PATCH 2/2] ME Draining on Dual-Stack Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/api/runtime_min.rs | 41 +++ src/transport/middle_proxy/health.rs | 311 ++++++++++++++++-- src/transport/middle_proxy/pool.rs | 238 +++++++++++++- src/transport/middle_proxy/pool_refill.rs | 5 +- src/transport/middle_proxy/pool_reinit.rs | 55 +++- .../middle_proxy/pool_runtime_api.rs | 51 ++- 6 files changed, 664 insertions(+), 37 deletions(-) diff --git a/src/api/runtime_min.rs b/src/api/runtime_min.rs index ae3b23f..3a107dc 100644 --- a/src/api/runtime_min.rs +++ b/src/api/runtime_min.rs @@ -154,6 +154,25 @@ pub(super) struct RuntimeMeQualityRouteDropData { pub(super) queue_full_high_total: u64, } +#[derive(Serialize)] +pub(super) struct RuntimeMeQualityFamilyStateData { + pub(super) family: &'static str, + pub(super) state: &'static str, + pub(super) state_since_epoch_secs: u64, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) suppressed_until_epoch_secs: Option, + pub(super) fail_streak: u32, + pub(super) recover_success_streak: u32, +} + +#[derive(Serialize)] +pub(super) struct RuntimeMeQualityDrainGateData { + pub(super) route_quorum_ok: bool, + pub(super) redundancy_ok: bool, + pub(super) block_reason: &'static str, + pub(super) updated_at_epoch_secs: u64, +} + #[derive(Serialize)] pub(super) struct RuntimeMeQualityDcRttData { pub(super) dc: i16, @@ -169,6 +188,8 @@ pub(super) struct RuntimeMeQualityPayload { pub(super) counters: RuntimeMeQualityCountersData, pub(super) teardown: RuntimeMeQualityTeardownData, pub(super) route_drops: RuntimeMeQualityRouteDropData, + pub(super) family_states: Vec, + pub(super) drain_gate: RuntimeMeQualityDrainGateData, pub(super) dc_rtt: Vec, } @@ -409,6 +430,19 @@ pub(super) async fn build_runtime_me_quality_data(shared: &ApiShared) -> Runtime }; let status = pool.api_status_snapshot().await; + let family_states = pool + .api_family_state_snapshot() + .into_iter() + .map(|entry| RuntimeMeQualityFamilyStateData { + family: entry.family, + state: entry.state, + state_since_epoch_secs: entry.state_since_epoch_secs, + suppressed_until_epoch_secs: entry.suppressed_until_epoch_secs, + fail_streak: entry.fail_streak, + recover_success_streak: entry.recover_success_streak, + }) + .collect(); + let drain_gate_snapshot = pool.api_drain_gate_snapshot(); RuntimeMeQualityData { enabled: true, reason: None, @@ -430,6 +464,13 @@ pub(super) async fn build_runtime_me_quality_data(shared: &ApiShared) -> Runtime queue_full_base_total: shared.stats.get_me_route_drop_queue_full_base(), queue_full_high_total: shared.stats.get_me_route_drop_queue_full_high(), }, + family_states, + drain_gate: RuntimeMeQualityDrainGateData { + route_quorum_ok: drain_gate_snapshot.route_quorum_ok, + redundancy_ok: drain_gate_snapshot.redundancy_ok, + block_reason: drain_gate_snapshot.block_reason, + updated_at_epoch_secs: drain_gate_snapshot.updated_at_epoch_secs, + }, dc_rtt: status .dcs .into_iter() diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 30e562b..d53b4ef 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -13,7 +13,7 @@ use crate::network::IpFamily; use crate::stats::MeWriterTeardownReason; use super::MePool; -use super::pool::MeWriter; +use super::pool::{MeFamilyRuntimeState, MeWriter}; const JITTER_FRAC_NUM: u64 = 2; // jitter up to 50% of backoff #[allow(dead_code)] @@ -34,6 +34,33 @@ const HEALTH_DRAIN_SOFT_EVICT_BUDGET_MIN: usize = 8; const HEALTH_DRAIN_SOFT_EVICT_BUDGET_MAX: usize = 256; const HEALTH_DRAIN_REAP_OPPORTUNISTIC_INTERVAL_SECS: u64 = 1; const HEALTH_DRAIN_TIMEOUT_ENFORCER_INTERVAL_SECS: u64 = 1; +const FAMILY_SUPPRESS_FAIL_STREAK_THRESHOLD: u32 = 6; +const FAMILY_SUPPRESS_WINDOW_SECS: u64 = 120; +const FAMILY_RECOVER_PROBE_INTERVAL_SECS: u64 = 5; +const FAMILY_RECOVER_SUCCESS_STREAK_REQUIRED: u32 = 3; + +#[derive(Debug, Clone)] +struct FamilyCircuitState { + state: MeFamilyRuntimeState, + state_since_at: Instant, + suppressed_until: Option, + next_probe_at: Instant, + fail_streak: u32, + recover_success_streak: u32, +} + +impl FamilyCircuitState { + fn new(now: Instant) -> Self { + Self { + state: MeFamilyRuntimeState::Healthy, + state_since_at: now, + suppressed_until: None, + next_probe_at: now, + fail_streak: 0, + recover_success_streak: 0, + } + } +} #[derive(Debug, Clone)] struct DcFloorPlanEntry { @@ -73,6 +100,25 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c let mut floor_warn_next_allowed: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut drain_warn_next_allowed: HashMap = HashMap::new(); let mut drain_soft_evict_next_allowed: HashMap = HashMap::new(); + let mut family_v4_circuit = FamilyCircuitState::new(Instant::now()); + let mut family_v6_circuit = FamilyCircuitState::new(Instant::now()); + let init_epoch_secs = MePool::now_epoch_secs(); + pool.set_family_runtime_state( + IpFamily::V4, + family_v4_circuit.state, + init_epoch_secs, + 0, + family_v4_circuit.fail_streak, + family_v4_circuit.recover_success_streak, + ); + pool.set_family_runtime_state( + IpFamily::V6, + family_v6_circuit.state, + init_epoch_secs, + 0, + family_v6_circuit.fail_streak, + family_v6_circuit.recover_success_streak, + ); let mut degraded_interval = true; loop { let interval = if degraded_interval { @@ -88,7 +134,9 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &mut drain_soft_evict_next_allowed, ) .await; - let v4_degraded = check_family( + let now = Instant::now(); + let now_epoch_secs = MePool::now_epoch_secs(); + let v4_degraded_raw = check_family( IpFamily::V4, &pool, &rng, @@ -107,25 +155,53 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &mut drain_soft_evict_next_allowed, ) .await; - let v6_degraded = check_family( - IpFamily::V6, + let v4_degraded = apply_family_circuit_result( &pool, - &rng, - &mut backoff, - &mut next_attempt, - &mut inflight, - &mut outage_backoff, - &mut outage_next_attempt, - &mut single_endpoint_outage, - &mut shadow_rotate_deadline, - &mut idle_refresh_next_attempt, - &mut adaptive_idle_since, - &mut adaptive_recover_until, - &mut floor_warn_next_allowed, - &mut drain_warn_next_allowed, - &mut drain_soft_evict_next_allowed, - ) - .await; + IpFamily::V4, + &mut family_v4_circuit, + Some(v4_degraded_raw), + false, + now, + now_epoch_secs, + ); + + let v6_check_ran = should_run_family_check(&mut family_v6_circuit, now); + let v6_degraded_raw = if v6_check_ran { + check_family( + IpFamily::V6, + &pool, + &rng, + &mut backoff, + &mut next_attempt, + &mut inflight, + &mut outage_backoff, + &mut outage_next_attempt, + &mut single_endpoint_outage, + &mut shadow_rotate_deadline, + &mut idle_refresh_next_attempt, + &mut adaptive_idle_since, + &mut adaptive_recover_until, + &mut floor_warn_next_allowed, + &mut drain_warn_next_allowed, + &mut drain_soft_evict_next_allowed, + ) + .await + } else { + false + }; + let v6_degraded = apply_family_circuit_result( + &pool, + IpFamily::V6, + &mut family_v6_circuit, + if v6_check_ran { + Some(v6_degraded_raw) + } else { + None + }, + true, + now, + now_epoch_secs, + ); degraded_interval = v4_degraded || v6_degraded; } } @@ -147,6 +223,148 @@ pub async fn me_drain_timeout_enforcer(pool: Arc) { } } +fn should_run_family_check(circuit: &mut FamilyCircuitState, now: Instant) -> bool { + match circuit.state { + MeFamilyRuntimeState::Suppressed => { + if now < circuit.next_probe_at { + return false; + } + circuit.next_probe_at = + now + Duration::from_secs(FAMILY_RECOVER_PROBE_INTERVAL_SECS); + true + } + _ => true, + } +} + +fn apply_family_circuit_result( + pool: &Arc, + family: IpFamily, + circuit: &mut FamilyCircuitState, + degraded: Option, + allow_suppress: bool, + now: Instant, + now_epoch_secs: u64, +) -> bool { + let Some(degraded) = degraded else { + // Preserve suppression state when probe tick is intentionally skipped. + return false; + }; + + let previous_state = circuit.state; + match circuit.state { + MeFamilyRuntimeState::Suppressed => { + if degraded { + circuit.fail_streak = circuit.fail_streak.saturating_add(1); + circuit.recover_success_streak = 0; + let until = now + Duration::from_secs(FAMILY_SUPPRESS_WINDOW_SECS); + circuit.suppressed_until = Some(until); + circuit.state_since_at = now; + warn!( + ?family, + fail_streak = circuit.fail_streak, + suppress_secs = FAMILY_SUPPRESS_WINDOW_SECS, + "ME family remains suppressed due to ongoing failures" + ); + } else { + circuit.fail_streak = 0; + circuit.recover_success_streak = 1; + circuit.state = MeFamilyRuntimeState::Recovering; + } + } + MeFamilyRuntimeState::Recovering => { + if degraded { + circuit.fail_streak = circuit.fail_streak.saturating_add(1); + if allow_suppress { + circuit.state = MeFamilyRuntimeState::Suppressed; + let until = now + Duration::from_secs(FAMILY_SUPPRESS_WINDOW_SECS); + circuit.suppressed_until = Some(until); + circuit.next_probe_at = + now + Duration::from_secs(FAMILY_RECOVER_PROBE_INTERVAL_SECS); + warn!( + ?family, + fail_streak = circuit.fail_streak, + suppress_secs = FAMILY_SUPPRESS_WINDOW_SECS, + "ME family temporarily suppressed after repeated degradation" + ); + } else { + circuit.state = MeFamilyRuntimeState::Degraded; + } + } else { + circuit.recover_success_streak = circuit.recover_success_streak.saturating_add(1); + if circuit.recover_success_streak >= FAMILY_RECOVER_SUCCESS_STREAK_REQUIRED { + circuit.fail_streak = 0; + circuit.recover_success_streak = 0; + circuit.suppressed_until = None; + circuit.state = MeFamilyRuntimeState::Healthy; + info!( + ?family, + "ME family suppression lifted after stable recovery probes" + ); + } + } + } + _ => { + if degraded { + circuit.fail_streak = circuit.fail_streak.saturating_add(1); + circuit.recover_success_streak = 0; + circuit.state = MeFamilyRuntimeState::Degraded; + if allow_suppress && circuit.fail_streak >= FAMILY_SUPPRESS_FAIL_STREAK_THRESHOLD { + circuit.state = MeFamilyRuntimeState::Suppressed; + let until = now + Duration::from_secs(FAMILY_SUPPRESS_WINDOW_SECS); + circuit.suppressed_until = Some(until); + circuit.next_probe_at = + now + Duration::from_secs(FAMILY_RECOVER_PROBE_INTERVAL_SECS); + warn!( + ?family, + fail_streak = circuit.fail_streak, + suppress_secs = FAMILY_SUPPRESS_WINDOW_SECS, + "ME family temporarily suppressed after repeated degradation" + ); + } + } else { + circuit.fail_streak = 0; + circuit.recover_success_streak = 0; + circuit.suppressed_until = None; + circuit.state = MeFamilyRuntimeState::Healthy; + } + } + } + + if previous_state != circuit.state { + circuit.state_since_at = now; + } + + let suppressed_until_epoch_secs = circuit + .suppressed_until + .and_then(|until| { + if until > now { + Some( + now_epoch_secs + .saturating_add(until.saturating_duration_since(now).as_secs()), + ) + } else { + None + } + }) + .unwrap_or(0); + let state_since_epoch_secs = if previous_state == circuit.state { + pool.family_runtime_state_since_epoch_secs(family) + } else { + now_epoch_secs + }; + pool.set_family_runtime_state( + family, + circuit.state, + state_since_epoch_secs, + suppressed_until_epoch_secs, + circuit.fail_streak, + circuit.recover_success_streak, + ); + + !matches!(circuit.state, MeFamilyRuntimeState::Suppressed) && degraded +} + fn draining_writer_timeout_expired( pool: &MePool, writer: &MeWriter, @@ -1746,13 +1964,19 @@ mod tests { use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; - use super::reap_draining_writers; + use super::{ + FamilyCircuitState, apply_family_circuit_result, reap_draining_writers, + should_run_family_check, + }; use crate::config::{GeneralConfig, MeRouteNoWriterMode, MeSocksKdfPolicy, MeWriterPickMode}; use crate::crypto::SecureRandom; + use crate::network::IpFamily; use crate::network::probe::NetworkDecision; use crate::stats::Stats; use crate::transport::middle_proxy::codec::WriterCommand; - use crate::transport::middle_proxy::pool::{MePool, MeWriter, WriterContour}; + use crate::transport::middle_proxy::pool::{ + MeFamilyRuntimeState, MePool, MeWriter, WriterContour, + }; use crate::transport::middle_proxy::registry::ConnMeta; async fn make_pool(me_pool_drain_threshold: u64) -> Arc { @@ -1930,4 +2154,47 @@ mod tests { assert_eq!(pool.registry.get_writer(conn_b).await.unwrap().writer_id, 20); assert_eq!(pool.registry.get_writer(conn_c).await.unwrap().writer_id, 30); } + + #[tokio::test] + async fn suppressed_family_probe_skip_preserves_suppressed_state() { + let pool = make_pool(0).await; + let now = Instant::now(); + let now_epoch_secs = MePool::now_epoch_secs(); + let suppressed_until_epoch_secs = now_epoch_secs.saturating_add(60); + pool.set_family_runtime_state( + IpFamily::V6, + MeFamilyRuntimeState::Suppressed, + now_epoch_secs, + suppressed_until_epoch_secs, + 7, + 0, + ); + + let mut circuit = FamilyCircuitState { + state: MeFamilyRuntimeState::Suppressed, + state_since_at: now, + suppressed_until: Some(now + Duration::from_secs(60)), + next_probe_at: now + Duration::from_secs(5), + fail_streak: 7, + recover_success_streak: 0, + }; + + assert!(!should_run_family_check(&mut circuit, now)); + assert!(!apply_family_circuit_result( + &pool, + IpFamily::V6, + &mut circuit, + None, + true, + now, + now_epoch_secs, + )); + assert_eq!(circuit.state, MeFamilyRuntimeState::Suppressed); + assert_eq!(circuit.fail_streak, 7); + assert_eq!(circuit.recover_success_streak, 0); + assert_eq!( + pool.family_runtime_state(IpFamily::V6), + MeFamilyRuntimeState::Suppressed, + ); + } } diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index f825058..27bcb07 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -74,6 +74,64 @@ impl WriterContour { } } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u8)] +pub(crate) enum MeFamilyRuntimeState { + Healthy = 0, + Degraded = 1, + Suppressed = 2, + Recovering = 3, +} + +impl MeFamilyRuntimeState { + pub(crate) fn from_u8(value: u8) -> Self { + match value { + 1 => Self::Degraded, + 2 => Self::Suppressed, + 3 => Self::Recovering, + _ => Self::Healthy, + } + } + + pub(crate) fn as_str(self) -> &'static str { + match self { + Self::Healthy => "healthy", + Self::Degraded => "degraded", + Self::Suppressed => "suppressed", + Self::Recovering => "recovering", + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u8)] +pub(crate) enum MeDrainGateReason { + Open = 0, + CoverageQuorum = 1, + Redundancy = 2, + SuppressionActive = 3, +} + +impl MeDrainGateReason { + pub(crate) fn from_u8(value: u8) -> Self { + match value { + 1 => Self::CoverageQuorum, + 2 => Self::Redundancy, + 3 => Self::SuppressionActive, + _ => Self::Open, + } + } + + pub(crate) fn as_str(self) -> &'static str { + match self { + Self::Open => "open", + Self::CoverageQuorum => "coverage_quorum", + Self::Redundancy => "redundancy", + Self::SuppressionActive => "suppression_active", + } + } +} + #[derive(Debug, Clone)] pub struct SecretSnapshot { pub epoch: u64, @@ -203,6 +261,20 @@ pub struct MePool { pub(super) me_health_interval_ms_unhealthy: AtomicU64, pub(super) me_health_interval_ms_healthy: AtomicU64, pub(super) me_warn_rate_limit_ms: AtomicU64, + pub(super) me_family_v4_runtime_state: AtomicU8, + pub(super) me_family_v6_runtime_state: AtomicU8, + pub(super) me_family_v4_state_since_epoch_secs: AtomicU64, + pub(super) me_family_v6_state_since_epoch_secs: AtomicU64, + pub(super) me_family_v4_suppressed_until_epoch_secs: AtomicU64, + pub(super) me_family_v6_suppressed_until_epoch_secs: AtomicU64, + pub(super) me_family_v4_fail_streak: AtomicU32, + pub(super) me_family_v6_fail_streak: AtomicU32, + pub(super) me_family_v4_recover_success_streak: AtomicU32, + pub(super) me_family_v6_recover_success_streak: AtomicU32, + pub(super) me_last_drain_gate_route_quorum_ok: AtomicBool, + pub(super) me_last_drain_gate_redundancy_ok: AtomicBool, + pub(super) me_last_drain_gate_block_reason: AtomicU8, + pub(super) me_last_drain_gate_updated_at_epoch_secs: AtomicU64, pub(super) runtime_ready: AtomicBool, pool_size: usize, pub(super) preferred_endpoints_by_dc: Arc>>>, @@ -518,6 +590,20 @@ impl MePool { me_health_interval_ms_unhealthy: AtomicU64::new(me_health_interval_ms_unhealthy.max(1)), me_health_interval_ms_healthy: AtomicU64::new(me_health_interval_ms_healthy.max(1)), me_warn_rate_limit_ms: AtomicU64::new(me_warn_rate_limit_ms.max(1)), + me_family_v4_runtime_state: AtomicU8::new(MeFamilyRuntimeState::Healthy as u8), + me_family_v6_runtime_state: AtomicU8::new(MeFamilyRuntimeState::Healthy as u8), + me_family_v4_state_since_epoch_secs: AtomicU64::new(Self::now_epoch_secs()), + me_family_v6_state_since_epoch_secs: AtomicU64::new(Self::now_epoch_secs()), + me_family_v4_suppressed_until_epoch_secs: AtomicU64::new(0), + me_family_v6_suppressed_until_epoch_secs: AtomicU64::new(0), + me_family_v4_fail_streak: AtomicU32::new(0), + me_family_v6_fail_streak: AtomicU32::new(0), + me_family_v4_recover_success_streak: AtomicU32::new(0), + me_family_v6_recover_success_streak: AtomicU32::new(0), + me_last_drain_gate_route_quorum_ok: AtomicBool::new(false), + me_last_drain_gate_redundancy_ok: AtomicBool::new(false), + me_last_drain_gate_block_reason: AtomicU8::new(MeDrainGateReason::Open as u8), + me_last_drain_gate_updated_at_epoch_secs: AtomicU64::new(Self::now_epoch_secs()), runtime_ready: AtomicBool::new(false), preferred_endpoints_by_dc: Arc::new(RwLock::new(preferred_endpoints_by_dc)), }) @@ -535,6 +621,153 @@ impl MePool { self.runtime_ready.load(Ordering::Relaxed) } + pub(super) fn set_family_runtime_state( + &self, + family: IpFamily, + state: MeFamilyRuntimeState, + state_since_epoch_secs: u64, + suppressed_until_epoch_secs: u64, + fail_streak: u32, + recover_success_streak: u32, + ) { + match family { + IpFamily::V4 => { + self.me_family_v4_runtime_state + .store(state as u8, Ordering::Relaxed); + self.me_family_v4_state_since_epoch_secs + .store(state_since_epoch_secs, Ordering::Relaxed); + self.me_family_v4_suppressed_until_epoch_secs + .store(suppressed_until_epoch_secs, Ordering::Relaxed); + self.me_family_v4_fail_streak + .store(fail_streak, Ordering::Relaxed); + self.me_family_v4_recover_success_streak + .store(recover_success_streak, Ordering::Relaxed); + } + IpFamily::V6 => { + self.me_family_v6_runtime_state + .store(state as u8, Ordering::Relaxed); + self.me_family_v6_state_since_epoch_secs + .store(state_since_epoch_secs, Ordering::Relaxed); + self.me_family_v6_suppressed_until_epoch_secs + .store(suppressed_until_epoch_secs, Ordering::Relaxed); + self.me_family_v6_fail_streak + .store(fail_streak, Ordering::Relaxed); + self.me_family_v6_recover_success_streak + .store(recover_success_streak, Ordering::Relaxed); + } + } + } + + pub(crate) fn family_runtime_state(&self, family: IpFamily) -> MeFamilyRuntimeState { + match family { + IpFamily::V4 => MeFamilyRuntimeState::from_u8( + self.me_family_v4_runtime_state.load(Ordering::Relaxed), + ), + IpFamily::V6 => MeFamilyRuntimeState::from_u8( + self.me_family_v6_runtime_state.load(Ordering::Relaxed), + ), + } + } + + pub(crate) fn family_runtime_state_since_epoch_secs(&self, family: IpFamily) -> u64 { + match family { + IpFamily::V4 => self + .me_family_v4_state_since_epoch_secs + .load(Ordering::Relaxed), + IpFamily::V6 => self + .me_family_v6_state_since_epoch_secs + .load(Ordering::Relaxed), + } + } + + pub(crate) fn family_suppressed_until_epoch_secs(&self, family: IpFamily) -> u64 { + match family { + IpFamily::V4 => self + .me_family_v4_suppressed_until_epoch_secs + .load(Ordering::Relaxed), + IpFamily::V6 => self + .me_family_v6_suppressed_until_epoch_secs + .load(Ordering::Relaxed), + } + } + + pub(crate) fn family_fail_streak(&self, family: IpFamily) -> u32 { + match family { + IpFamily::V4 => self.me_family_v4_fail_streak.load(Ordering::Relaxed), + IpFamily::V6 => self.me_family_v6_fail_streak.load(Ordering::Relaxed), + } + } + + pub(crate) fn family_recover_success_streak(&self, family: IpFamily) -> u32 { + match family { + IpFamily::V4 => self + .me_family_v4_recover_success_streak + .load(Ordering::Relaxed), + IpFamily::V6 => self + .me_family_v6_recover_success_streak + .load(Ordering::Relaxed), + } + } + + pub(crate) fn is_family_temporarily_suppressed( + &self, + family: IpFamily, + now_epoch_secs: u64, + ) -> bool { + self.family_suppressed_until_epoch_secs(family) > now_epoch_secs + } + + pub(super) fn family_enabled_for_drain_coverage( + &self, + family: IpFamily, + now_epoch_secs: u64, + ) -> bool { + let configured = match family { + IpFamily::V4 => self.decision.ipv4_me, + IpFamily::V6 => self.decision.ipv6_me, + }; + configured && !self.is_family_temporarily_suppressed(family, now_epoch_secs) + } + + pub(super) fn set_last_drain_gate( + &self, + route_quorum_ok: bool, + redundancy_ok: bool, + block_reason: MeDrainGateReason, + updated_at_epoch_secs: u64, + ) { + self.me_last_drain_gate_route_quorum_ok + .store(route_quorum_ok, Ordering::Relaxed); + self.me_last_drain_gate_redundancy_ok + .store(redundancy_ok, Ordering::Relaxed); + self.me_last_drain_gate_block_reason + .store(block_reason as u8, Ordering::Relaxed); + self.me_last_drain_gate_updated_at_epoch_secs + .store(updated_at_epoch_secs, Ordering::Relaxed); + } + + pub(crate) fn last_drain_gate_route_quorum_ok(&self) -> bool { + self.me_last_drain_gate_route_quorum_ok + .load(Ordering::Relaxed) + } + + pub(crate) fn last_drain_gate_redundancy_ok(&self) -> bool { + self.me_last_drain_gate_redundancy_ok + .load(Ordering::Relaxed) + } + + pub(crate) fn last_drain_gate_block_reason(&self) -> MeDrainGateReason { + MeDrainGateReason::from_u8( + self.me_last_drain_gate_block_reason + .load(Ordering::Relaxed), + ) + } + + pub(crate) fn last_drain_gate_updated_at_epoch_secs(&self) -> u64 { + self.me_last_drain_gate_updated_at_epoch_secs + .load(Ordering::Relaxed) + } + pub fn update_runtime_reinit_policy( &self, hardswap: bool, @@ -1021,9 +1254,10 @@ impl MePool { } pub(super) async fn active_coverage_required_total(&self) -> usize { + let now_epoch_secs = Self::now_epoch_secs(); let mut endpoints_by_dc = HashMap::>::new(); - if self.decision.ipv4_me { + if self.family_enabled_for_drain_coverage(IpFamily::V4, now_epoch_secs) { let map = self.proxy_map_v4.read().await; for (dc, addrs) in map.iter() { let entry = endpoints_by_dc.entry(*dc).or_default(); @@ -1033,7 +1267,7 @@ impl MePool { } } - if self.decision.ipv6_me { + if self.family_enabled_for_drain_coverage(IpFamily::V6, now_epoch_secs) { let map = self.proxy_map_v6.read().await; for (dc, addrs) in map.iter() { let entry = endpoints_by_dc.entry(*dc).or_default(); diff --git a/src/transport/middle_proxy/pool_refill.rs b/src/transport/middle_proxy/pool_refill.rs index e4fb95f..3c5d4b3 100644 --- a/src/transport/middle_proxy/pool_refill.rs +++ b/src/transport/middle_proxy/pool_refill.rs @@ -164,9 +164,10 @@ impl MePool { } async fn endpoints_for_dc(&self, target_dc: i32) -> Vec { + let now_epoch_secs = Self::now_epoch_secs(); let mut endpoints = HashSet::::new(); - if self.decision.ipv4_me { + if self.family_enabled_for_drain_coverage(IpFamily::V4, now_epoch_secs) { let map = self.proxy_map_v4.read().await; if let Some(addrs) = map.get(&target_dc) { for (ip, port) in addrs { @@ -175,7 +176,7 @@ impl MePool { } } - if self.decision.ipv6_me { + if self.family_enabled_for_drain_coverage(IpFamily::V6, now_epoch_secs) { let map = self.proxy_map_v6.read().await; if let Some(addrs) = map.get(&target_dc) { for (ip, port) in addrs { diff --git a/src/transport/middle_proxy/pool_reinit.rs b/src/transport/middle_proxy/pool_reinit.rs index 0d5c6f4..bfd56c6 100644 --- a/src/transport/middle_proxy/pool_reinit.rs +++ b/src/transport/middle_proxy/pool_reinit.rs @@ -11,8 +11,9 @@ use tracing::{debug, info, warn}; use std::collections::hash_map::DefaultHasher; use crate::crypto::SecureRandom; +use crate::network::IpFamily; -use super::pool::{MePool, WriterContour}; +use super::pool::{MeDrainGateReason, MePool, WriterContour}; const ME_HARDSWAP_PENDING_TTL_SECS: u64 = 1800; @@ -120,9 +121,10 @@ impl MePool { } async fn desired_dc_endpoints(&self) -> HashMap> { + let now_epoch_secs = Self::now_epoch_secs(); let mut out: HashMap> = HashMap::new(); - if self.decision.ipv4_me { + if self.family_enabled_for_drain_coverage(IpFamily::V4, now_epoch_secs) { let map_v4 = self.proxy_map_v4.read().await.clone(); for (dc, addrs) in map_v4 { let entry = out.entry(dc).or_default(); @@ -132,7 +134,7 @@ impl MePool { } } - if self.decision.ipv6_me { + if self.family_enabled_for_drain_coverage(IpFamily::V6, now_epoch_secs) { let map_v6 = self.proxy_map_v6.read().await.clone(); for (dc, addrs) in map_v6 { let entry = out.entry(dc).or_default(); @@ -313,13 +315,23 @@ impl MePool { pub async fn zero_downtime_reinit_after_map_change(self: &Arc, rng: &SecureRandom) { let desired_by_dc = self.desired_dc_endpoints().await; + let now_epoch_secs = Self::now_epoch_secs(); + let v4_suppressed = self.is_family_temporarily_suppressed(IpFamily::V4, now_epoch_secs); + let v6_suppressed = self.is_family_temporarily_suppressed(IpFamily::V6, now_epoch_secs); if desired_by_dc.is_empty() { warn!("ME endpoint map is empty; skipping stale writer drain"); + let reason = if (self.decision.ipv4_me && v4_suppressed) + || (self.decision.ipv6_me && v6_suppressed) + { + MeDrainGateReason::SuppressionActive + } else { + MeDrainGateReason::CoverageQuorum + }; + self.set_last_drain_gate(false, false, reason, now_epoch_secs); return; } let desired_map_hash = Self::desired_map_hash(&desired_by_dc); - let now_epoch_secs = Self::now_epoch_secs(); let previous_generation = self.current_generation(); let hardswap = self.hardswap.load(Ordering::Relaxed); let generation = if hardswap { @@ -390,7 +402,17 @@ impl MePool { .load(Ordering::Relaxed), ); let (coverage_ratio, missing_dc) = Self::coverage_ratio(&desired_by_dc, &active_writer_addrs); + let mut route_quorum_ok = coverage_ratio >= min_ratio; + let mut redundancy_ok = missing_dc.is_empty(); + let mut redundancy_missing_dc = missing_dc.clone(); + let mut gate_coverage_ratio = coverage_ratio; if !hardswap && coverage_ratio < min_ratio { + self.set_last_drain_gate( + false, + redundancy_ok, + MeDrainGateReason::CoverageQuorum, + now_epoch_secs, + ); warn!( previous_generation, generation, @@ -411,7 +433,17 @@ impl MePool { .collect(); let (fresh_coverage_ratio, fresh_missing_dc) = Self::coverage_ratio(&desired_by_dc, &fresh_writer_addrs); - if !fresh_missing_dc.is_empty() { + route_quorum_ok = fresh_coverage_ratio >= min_ratio; + redundancy_ok = fresh_missing_dc.is_empty(); + redundancy_missing_dc = fresh_missing_dc.clone(); + gate_coverage_ratio = fresh_coverage_ratio; + if fresh_coverage_ratio < min_ratio { + self.set_last_drain_gate( + false, + redundancy_ok, + MeDrainGateReason::CoverageQuorum, + now_epoch_secs, + ); warn!( previous_generation, generation, @@ -421,13 +453,16 @@ impl MePool { ); return; } - } else if !missing_dc.is_empty() { + } + + self.set_last_drain_gate(route_quorum_ok, redundancy_ok, MeDrainGateReason::Open, now_epoch_secs); + if !redundancy_ok { warn!( - missing_dc = ?missing_dc, - // Keep stale writers alive when fresh coverage is incomplete. - "ME reinit coverage incomplete; keeping stale writers" + missing_dc = ?redundancy_missing_dc, + coverage_ratio = format_args!("{gate_coverage_ratio:.3}"), + min_ratio = format_args!("{min_ratio:.3}"), + "ME reinit proceeds with weighted quorum while some DC groups remain uncovered" ); - return; } if hardswap { diff --git a/src/transport/middle_proxy/pool_runtime_api.rs b/src/transport/middle_proxy/pool_runtime_api.rs index 37ef298..adacd4e 100644 --- a/src/transport/middle_proxy/pool_runtime_api.rs +++ b/src/transport/middle_proxy/pool_runtime_api.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::time::Instant; -use super::pool::{MePool, RefillDcKey}; +use super::pool::{MeDrainGateReason, MePool, RefillDcKey}; use crate::network::IpFamily; #[derive(Clone, Debug)] @@ -36,6 +36,24 @@ pub(crate) struct MeApiNatStunSnapshot { pub stun_backoff_remaining_ms: Option, } +#[derive(Clone, Debug)] +pub(crate) struct MeApiFamilyStateSnapshot { + pub family: &'static str, + pub state: &'static str, + pub state_since_epoch_secs: u64, + pub suppressed_until_epoch_secs: Option, + pub fail_streak: u32, + pub recover_success_streak: u32, +} + +#[derive(Clone, Debug)] +pub(crate) struct MeApiDrainGateSnapshot { + pub route_quorum_ok: bool, + pub redundancy_ok: bool, + pub block_reason: &'static str, + pub updated_at_epoch_secs: u64, +} + impl MePool { pub(crate) async fn api_refill_snapshot(&self) -> MeApiRefillSnapshot { let inflight_endpoints_total = self.refill_inflight.lock().await.len(); @@ -125,4 +143,35 @@ impl MePool { stun_backoff_remaining_ms, } } + + pub(crate) fn api_family_state_snapshot(&self) -> Vec { + [IpFamily::V4, IpFamily::V6] + .into_iter() + .map(|family| { + let state = self.family_runtime_state(family); + let suppressed_until = self.family_suppressed_until_epoch_secs(family); + MeApiFamilyStateSnapshot { + family: match family { + IpFamily::V4 => "v4", + IpFamily::V6 => "v6", + }, + state: state.as_str(), + state_since_epoch_secs: self.family_runtime_state_since_epoch_secs(family), + suppressed_until_epoch_secs: (suppressed_until != 0).then_some(suppressed_until), + fail_streak: self.family_fail_streak(family), + recover_success_streak: self.family_recover_success_streak(family), + } + }) + .collect() + } + + pub(crate) fn api_drain_gate_snapshot(&self) -> MeApiDrainGateSnapshot { + let reason: MeDrainGateReason = self.last_drain_gate_block_reason(); + MeApiDrainGateSnapshot { + route_quorum_ok: self.last_drain_gate_route_quorum_ok(), + redundancy_ok: self.last_drain_gate_redundancy_ok(), + block_reason: reason.as_str(), + updated_at_epoch_secs: self.last_drain_gate_updated_at_epoch_secs(), + } + } }