diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 0ea6692..5602287 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -8,6 +8,7 @@ const DEFAULT_STUN_TCP_FALLBACK: bool = true; const DEFAULT_MIDDLE_PROXY_WARM_STANDBY: usize = 16; const DEFAULT_ME_RECONNECT_MAX_CONCURRENT_PER_DC: u32 = 8; const DEFAULT_ME_RECONNECT_FAST_RETRY_COUNT: u32 = 16; +const DEFAULT_ME_SINGLE_ENDPOINT_SHADOW_WRITERS: u8 = 2; const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 3; const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 4; const DEFAULT_LISTEN_ADDR_IPV6: &str = "::"; @@ -160,6 +161,30 @@ pub(crate) fn default_me_reconnect_fast_retry_count() -> u32 { DEFAULT_ME_RECONNECT_FAST_RETRY_COUNT } +pub(crate) fn default_me_single_endpoint_shadow_writers() -> u8 { + DEFAULT_ME_SINGLE_ENDPOINT_SHADOW_WRITERS +} + +pub(crate) fn default_me_single_endpoint_outage_mode_enabled() -> bool { + true +} + +pub(crate) fn default_me_single_endpoint_outage_disable_quarantine() -> bool { + true +} + +pub(crate) fn default_me_single_endpoint_outage_backoff_min_ms() -> u64 { + 250 +} + +pub(crate) fn default_me_single_endpoint_outage_backoff_max_ms() -> u64 { + 3000 +} + +pub(crate) fn default_me_single_endpoint_shadow_rotate_every_secs() -> u64 { + 900 +} + pub(crate) fn default_upstream_connect_retry_attempts() -> u32 { DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS } diff --git a/src/config/load.rs b/src/config/load.rs index 17545b9..a027d58 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -255,6 +255,32 @@ impl ProxyConfig { )); } + if config.general.me_single_endpoint_shadow_writers > 32 { + return Err(ProxyError::Config( + "general.me_single_endpoint_shadow_writers must be within [0, 32]".to_string(), + )); + } + + if config.general.me_single_endpoint_outage_backoff_min_ms == 0 { + return Err(ProxyError::Config( + "general.me_single_endpoint_outage_backoff_min_ms must be > 0".to_string(), + )); + } + + if config.general.me_single_endpoint_outage_backoff_max_ms == 0 { + return Err(ProxyError::Config( + "general.me_single_endpoint_outage_backoff_max_ms must be > 0".to_string(), + )); + } + + if config.general.me_single_endpoint_outage_backoff_min_ms + > config.general.me_single_endpoint_outage_backoff_max_ms + { + return Err(ProxyError::Config( + "general.me_single_endpoint_outage_backoff_min_ms must be <= general.me_single_endpoint_outage_backoff_max_ms".to_string(), + )); + } + if config.general.beobachten_minutes == 0 { return Err(ProxyError::Config( "general.beobachten_minutes must be > 0".to_string(), @@ -592,6 +618,30 @@ mod tests { cfg.general.me_reconnect_fast_retry_count, default_me_reconnect_fast_retry_count() ); + assert_eq!( + cfg.general.me_single_endpoint_shadow_writers, + default_me_single_endpoint_shadow_writers() + ); + assert_eq!( + cfg.general.me_single_endpoint_outage_mode_enabled, + default_me_single_endpoint_outage_mode_enabled() + ); + assert_eq!( + cfg.general.me_single_endpoint_outage_disable_quarantine, + default_me_single_endpoint_outage_disable_quarantine() + ); + assert_eq!( + cfg.general.me_single_endpoint_outage_backoff_min_ms, + default_me_single_endpoint_outage_backoff_min_ms() + ); + assert_eq!( + cfg.general.me_single_endpoint_outage_backoff_max_ms, + default_me_single_endpoint_outage_backoff_max_ms() + ); + assert_eq!( + cfg.general.me_single_endpoint_shadow_rotate_every_secs, + default_me_single_endpoint_shadow_rotate_every_secs() + ); assert_eq!( cfg.general.upstream_connect_retry_attempts, default_upstream_connect_retry_attempts() @@ -630,6 +680,30 @@ mod tests { general.me_reconnect_fast_retry_count, default_me_reconnect_fast_retry_count() ); + assert_eq!( + general.me_single_endpoint_shadow_writers, + default_me_single_endpoint_shadow_writers() + ); + assert_eq!( + general.me_single_endpoint_outage_mode_enabled, + default_me_single_endpoint_outage_mode_enabled() + ); + assert_eq!( + general.me_single_endpoint_outage_disable_quarantine, + default_me_single_endpoint_outage_disable_quarantine() + ); + assert_eq!( + general.me_single_endpoint_outage_backoff_min_ms, + default_me_single_endpoint_outage_backoff_min_ms() + ); + assert_eq!( + general.me_single_endpoint_outage_backoff_max_ms, + default_me_single_endpoint_outage_backoff_max_ms() + ); + assert_eq!( + general.me_single_endpoint_shadow_rotate_every_secs, + default_me_single_endpoint_shadow_rotate_every_secs() + ); assert_eq!( general.upstream_connect_retry_attempts, default_upstream_connect_retry_attempts() @@ -814,6 +888,49 @@ mod tests { let _ = std::fs::remove_file(path); } + #[test] + fn me_single_endpoint_outage_backoff_range_is_validated() { + let toml = r#" + [general] + me_single_endpoint_outage_backoff_min_ms = 4000 + me_single_endpoint_outage_backoff_max_ms = 3000 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_me_single_endpoint_outage_backoff_range_test.toml"); + std::fs::write(&path, toml).unwrap(); + let err = ProxyConfig::load(&path).unwrap_err().to_string(); + assert!(err.contains( + "general.me_single_endpoint_outage_backoff_min_ms must be <= general.me_single_endpoint_outage_backoff_max_ms" + )); + let _ = std::fs::remove_file(path); + } + + #[test] + fn me_single_endpoint_shadow_writers_too_large_is_rejected() { + let toml = r#" + [general] + me_single_endpoint_shadow_writers = 33 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_me_single_endpoint_shadow_writers_limit_test.toml"); + std::fs::write(&path, toml).unwrap(); + let err = ProxyConfig::load(&path).unwrap_err().to_string(); + assert!(err.contains("general.me_single_endpoint_shadow_writers must be within [0, 32]")); + let _ = std::fs::remove_file(path); + } + #[test] fn upstream_connect_retry_attempts_zero_is_rejected() { let toml = r#" diff --git a/src/config/types.rs b/src/config/types.rs index d57c890..324abf2 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -394,6 +394,31 @@ pub struct GeneralConfig { #[serde(default = "default_me_reconnect_fast_retry_count")] pub me_reconnect_fast_retry_count: u32, + /// Number of additional reserve writers for DC groups with exactly one endpoint. + #[serde(default = "default_me_single_endpoint_shadow_writers")] + pub me_single_endpoint_shadow_writers: u8, + + /// Enable aggressive outage recovery mode for single-endpoint DC groups. + #[serde(default = "default_me_single_endpoint_outage_mode_enabled")] + pub me_single_endpoint_outage_mode_enabled: bool, + + /// Ignore endpoint quarantine while in single-endpoint outage mode. + #[serde(default = "default_me_single_endpoint_outage_disable_quarantine")] + pub me_single_endpoint_outage_disable_quarantine: bool, + + /// Minimum reconnect backoff in ms for single-endpoint outage mode. + #[serde(default = "default_me_single_endpoint_outage_backoff_min_ms")] + pub me_single_endpoint_outage_backoff_min_ms: u64, + + /// Maximum reconnect backoff in ms for single-endpoint outage mode. + #[serde(default = "default_me_single_endpoint_outage_backoff_max_ms")] + pub me_single_endpoint_outage_backoff_max_ms: u64, + + /// Periodic shadow writer rotation interval in seconds for single-endpoint DC groups. + /// Set to 0 to disable periodic shadow rotation. + #[serde(default = "default_me_single_endpoint_shadow_rotate_every_secs")] + pub me_single_endpoint_shadow_rotate_every_secs: u64, + /// Connect attempts for the selected upstream before returning error/fallback. #[serde(default = "default_upstream_connect_retry_attempts")] pub upstream_connect_retry_attempts: u32, @@ -603,6 +628,12 @@ impl Default for GeneralConfig { me_reconnect_backoff_base_ms: default_reconnect_backoff_base_ms(), me_reconnect_backoff_cap_ms: default_reconnect_backoff_cap_ms(), me_reconnect_fast_retry_count: default_me_reconnect_fast_retry_count(), + me_single_endpoint_shadow_writers: default_me_single_endpoint_shadow_writers(), + me_single_endpoint_outage_mode_enabled: default_me_single_endpoint_outage_mode_enabled(), + me_single_endpoint_outage_disable_quarantine: default_me_single_endpoint_outage_disable_quarantine(), + me_single_endpoint_outage_backoff_min_ms: default_me_single_endpoint_outage_backoff_min_ms(), + me_single_endpoint_outage_backoff_max_ms: default_me_single_endpoint_outage_backoff_max_ms(), + me_single_endpoint_shadow_rotate_every_secs: default_me_single_endpoint_shadow_rotate_every_secs(), upstream_connect_retry_attempts: default_upstream_connect_retry_attempts(), upstream_connect_retry_backoff_ms: default_upstream_connect_retry_backoff_ms(), upstream_unhealthy_fail_threshold: default_upstream_unhealthy_fail_threshold(), diff --git a/src/main.rs b/src/main.rs index 03998cd..4ff79fc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -538,6 +538,12 @@ async fn main() -> std::result::Result<(), Box> { config.general.me_reconnect_backoff_base_ms, config.general.me_reconnect_backoff_cap_ms, config.general.me_reconnect_fast_retry_count, + config.general.me_single_endpoint_shadow_writers, + config.general.me_single_endpoint_outage_mode_enabled, + config.general.me_single_endpoint_outage_disable_quarantine, + config.general.me_single_endpoint_outage_backoff_min_ms, + config.general.me_single_endpoint_outage_backoff_max_ms, + config.general.me_single_endpoint_shadow_rotate_every_secs, config.general.hardswap, config.general.me_pool_drain_ttl_secs, config.general.effective_me_pool_force_close_secs(), diff --git a/src/metrics.rs b/src/metrics.rs index fcbd03c..4f4c317 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -479,6 +479,114 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp } ); + let _ = writeln!( + out, + "# HELP telemt_me_single_endpoint_outage_enter_total Single-endpoint DC outage transitions to active state" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_single_endpoint_outage_enter_total counter" + ); + let _ = writeln!( + out, + "telemt_me_single_endpoint_outage_enter_total {}", + if me_allows_normal { + stats.get_me_single_endpoint_outage_enter_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_single_endpoint_outage_exit_total Single-endpoint DC outage recovery transitions" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_single_endpoint_outage_exit_total counter" + ); + let _ = writeln!( + out, + "telemt_me_single_endpoint_outage_exit_total {}", + if me_allows_normal { + stats.get_me_single_endpoint_outage_exit_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_single_endpoint_outage_reconnect_attempt_total Reconnect attempts performed during single-endpoint outages" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_single_endpoint_outage_reconnect_attempt_total counter" + ); + let _ = writeln!( + out, + "telemt_me_single_endpoint_outage_reconnect_attempt_total {}", + if me_allows_normal { + stats.get_me_single_endpoint_outage_reconnect_attempt_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_single_endpoint_outage_reconnect_success_total Successful reconnect attempts during single-endpoint outages" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_single_endpoint_outage_reconnect_success_total counter" + ); + let _ = writeln!( + out, + "telemt_me_single_endpoint_outage_reconnect_success_total {}", + if me_allows_normal { + stats.get_me_single_endpoint_outage_reconnect_success_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_single_endpoint_quarantine_bypass_total Outage reconnect attempts that bypassed quarantine" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_single_endpoint_quarantine_bypass_total counter" + ); + let _ = writeln!( + out, + "telemt_me_single_endpoint_quarantine_bypass_total {}", + if me_allows_normal { + stats.get_me_single_endpoint_quarantine_bypass_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_single_endpoint_shadow_rotate_total Successful periodic shadow rotations for single-endpoint DC groups" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_single_endpoint_shadow_rotate_total counter" + ); + let _ = writeln!( + out, + "telemt_me_single_endpoint_shadow_rotate_total {}", + if me_allows_normal { + stats.get_me_single_endpoint_shadow_rotate_total() + } else { + 0 + } + ); + let _ = writeln!(out, "# HELP telemt_secure_padding_invalid_total Invalid secure frame lengths"); let _ = writeln!(out, "# TYPE telemt_secure_padding_invalid_total counter"); let _ = writeln!( diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 453a73a..5b2bb94 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -40,6 +40,12 @@ pub struct Stats { me_kdf_drift_total: AtomicU64, me_hardswap_pending_reuse_total: AtomicU64, me_hardswap_pending_ttl_expired_total: AtomicU64, + me_single_endpoint_outage_enter_total: AtomicU64, + me_single_endpoint_outage_exit_total: AtomicU64, + me_single_endpoint_outage_reconnect_attempt_total: AtomicU64, + me_single_endpoint_outage_reconnect_success_total: AtomicU64, + me_single_endpoint_quarantine_bypass_total: AtomicU64, + me_single_endpoint_shadow_rotate_total: AtomicU64, me_handshake_error_codes: DashMap, me_route_drop_no_conn: AtomicU64, me_route_drop_channel_closed: AtomicU64, @@ -383,6 +389,42 @@ impl Stats { .fetch_add(1, Ordering::Relaxed); } } + pub fn increment_me_single_endpoint_outage_enter_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_single_endpoint_outage_enter_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_single_endpoint_outage_exit_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_single_endpoint_outage_exit_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_single_endpoint_outage_reconnect_attempt_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_single_endpoint_outage_reconnect_attempt_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_single_endpoint_outage_reconnect_success_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_single_endpoint_outage_reconnect_success_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_single_endpoint_quarantine_bypass_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_single_endpoint_quarantine_bypass_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_single_endpoint_shadow_rotate_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_single_endpoint_shadow_rotate_total + .fetch_add(1, Ordering::Relaxed); + } + } pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) } pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) } pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) } @@ -413,6 +455,30 @@ impl Stats { self.me_hardswap_pending_ttl_expired_total .load(Ordering::Relaxed) } + pub fn get_me_single_endpoint_outage_enter_total(&self) -> u64 { + self.me_single_endpoint_outage_enter_total + .load(Ordering::Relaxed) + } + pub fn get_me_single_endpoint_outage_exit_total(&self) -> u64 { + self.me_single_endpoint_outage_exit_total + .load(Ordering::Relaxed) + } + pub fn get_me_single_endpoint_outage_reconnect_attempt_total(&self) -> u64 { + self.me_single_endpoint_outage_reconnect_attempt_total + .load(Ordering::Relaxed) + } + pub fn get_me_single_endpoint_outage_reconnect_success_total(&self) -> u64 { + self.me_single_endpoint_outage_reconnect_success_total + .load(Ordering::Relaxed) + } + pub fn get_me_single_endpoint_quarantine_bypass_total(&self) -> u64 { + self.me_single_endpoint_quarantine_bypass_total + .load(Ordering::Relaxed) + } + pub fn get_me_single_endpoint_shadow_rotate_total(&self) -> u64 { + self.me_single_endpoint_shadow_rotate_total + .load(Ordering::Relaxed) + } pub fn get_me_handshake_error_code_counts(&self) -> Vec<(i32, u64)> { let mut out: Vec<(i32, u64)> = self .me_handshake_error_codes diff --git a/src/transport/middle_proxy/config_updater.rs b/src/transport/middle_proxy/config_updater.rs index 2772b27..6da14fa 100644 --- a/src/transport/middle_proxy/config_updater.rs +++ b/src/transport/middle_proxy/config_updater.rs @@ -276,6 +276,12 @@ async fn run_update_cycle( cfg.general.me_bind_stale_ttl_secs, cfg.general.me_secret_atomic_snapshot, cfg.general.me_deterministic_writer_sort, + cfg.general.me_single_endpoint_shadow_writers, + cfg.general.me_single_endpoint_outage_mode_enabled, + cfg.general.me_single_endpoint_outage_disable_quarantine, + cfg.general.me_single_endpoint_outage_backoff_min_ms, + cfg.general.me_single_endpoint_outage_backoff_max_ms, + cfg.general.me_single_endpoint_shadow_rotate_every_secs, ); let required_cfg_snapshots = cfg.general.me_config_stable_snapshots.max(1); @@ -478,6 +484,12 @@ pub async fn me_config_updater( cfg.general.me_bind_stale_ttl_secs, cfg.general.me_secret_atomic_snapshot, cfg.general.me_deterministic_writer_sort, + cfg.general.me_single_endpoint_shadow_writers, + cfg.general.me_single_endpoint_outage_mode_enabled, + cfg.general.me_single_endpoint_outage_disable_quarantine, + cfg.general.me_single_endpoint_outage_backoff_min_ms, + cfg.general.me_single_endpoint_outage_backoff_max_ms, + cfg.general.me_single_endpoint_shadow_rotate_every_secs, ); let new_secs = cfg.general.effective_update_every_secs().max(1); if new_secs == update_every_secs { diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index afa96c6..c07ec4f 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -1,10 +1,11 @@ use std::collections::HashMap; +use std::collections::HashSet; use std::net::SocketAddr; use std::sync::Arc; use std::time::{Duration, Instant}; -use tracing::{debug, info, warn}; use rand::Rng; +use tracing::{debug, info, warn}; use crate::crypto::SecureRandom; use crate::network::IpFamily; @@ -15,11 +16,16 @@ const HEALTH_INTERVAL_SECS: u64 = 1; const JITTER_FRAC_NUM: u64 = 2; // jitter up to 50% of backoff #[allow(dead_code)] const MAX_CONCURRENT_PER_DC_DEFAULT: usize = 1; +const SHADOW_ROTATE_RETRY_SECS: u64 = 30; pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_connections: usize) { let mut backoff: HashMap<(i32, IpFamily), u64> = HashMap::new(); let mut next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut inflight: HashMap<(i32, IpFamily), usize> = HashMap::new(); + let mut outage_backoff: HashMap<(i32, IpFamily), u64> = HashMap::new(); + let mut outage_next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new(); + let mut single_endpoint_outage: HashSet<(i32, IpFamily)> = HashSet::new(); + let mut shadow_rotate_deadline: HashMap<(i32, IpFamily), Instant> = HashMap::new(); loop { tokio::time::sleep(Duration::from_secs(HEALTH_INTERVAL_SECS)).await; pool.prune_closed_writers().await; @@ -30,6 +36,10 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &mut backoff, &mut next_attempt, &mut inflight, + &mut outage_backoff, + &mut outage_next_attempt, + &mut single_endpoint_outage, + &mut shadow_rotate_deadline, ) .await; check_family( @@ -39,6 +49,10 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &mut backoff, &mut next_attempt, &mut inflight, + &mut outage_backoff, + &mut outage_next_attempt, + &mut single_endpoint_outage, + &mut shadow_rotate_deadline, ) .await; } @@ -51,6 +65,10 @@ async fn check_family( backoff: &mut HashMap<(i32, IpFamily), u64>, next_attempt: &mut HashMap<(i32, IpFamily), Instant>, inflight: &mut HashMap<(i32, IpFamily), usize>, + outage_backoff: &mut HashMap<(i32, IpFamily), u64>, + outage_next_attempt: &mut HashMap<(i32, IpFamily), Instant>, + single_endpoint_outage: &mut HashSet<(i32, IpFamily)>, + shadow_rotate_deadline: &mut HashMap<(i32, IpFamily), Instant>, ) { let enabled = match family { IpFamily::V4 => pool.decision.ipv4_me, @@ -78,31 +96,86 @@ async fn check_family( } let mut live_addr_counts = HashMap::::new(); - for writer in pool - .writers - .read() - .await - .iter() - .filter(|w| !w.draining.load(std::sync::atomic::Ordering::Relaxed)) - { + let mut live_writer_ids_by_addr = HashMap::>::new(); + for writer in pool.writers.read().await.iter().filter(|w| { + !w.draining.load(std::sync::atomic::Ordering::Relaxed) + }) { *live_addr_counts.entry(writer.addr).or_insert(0) += 1; + live_writer_ids_by_addr + .entry(writer.addr) + .or_default() + .push(writer.id); } for (dc, endpoints) in dc_endpoints { if endpoints.is_empty() { continue; } - let required = MePool::required_writers_for_dc(endpoints.len()); + let required = pool.required_writers_for_dc(endpoints.len()); let alive = endpoints .iter() .map(|addr| *live_addr_counts.get(addr).unwrap_or(&0)) .sum::(); + let key = (dc, family); + + if endpoints.len() == 1 && pool.single_endpoint_outage_mode_enabled() && alive == 0 { + if single_endpoint_outage.insert(key) { + pool.stats.increment_me_single_endpoint_outage_enter_total(); + warn!( + dc = %dc, + ?family, + required, + endpoint_count = endpoints.len(), + "Single-endpoint DC outage detected" + ); + } + + recover_single_endpoint_outage( + pool, + rng, + key, + endpoints[0], + required, + outage_backoff, + outage_next_attempt, + ) + .await; + continue; + } + + if single_endpoint_outage.remove(&key) { + pool.stats.increment_me_single_endpoint_outage_exit_total(); + outage_backoff.remove(&key); + outage_next_attempt.remove(&key); + shadow_rotate_deadline.remove(&key); + info!( + dc = %dc, + ?family, + alive, + required, + endpoint_count = endpoints.len(), + "Single-endpoint DC outage recovered" + ); + } + if alive >= required { + maybe_rotate_single_endpoint_shadow( + pool, + rng, + key, + dc, + family, + &endpoints, + alive, + required, + &live_writer_ids_by_addr, + shadow_rotate_deadline, + ) + .await; continue; } let missing = required - alive; - let key = (dc, family); let now = Instant::now(); if let Some(ts) = next_attempt.get(&key) && now < *ts @@ -188,3 +261,207 @@ async fn check_family( } } } + +async fn recover_single_endpoint_outage( + pool: &Arc, + rng: &Arc, + key: (i32, IpFamily), + endpoint: SocketAddr, + required: usize, + outage_backoff: &mut HashMap<(i32, IpFamily), u64>, + outage_next_attempt: &mut HashMap<(i32, IpFamily), Instant>, +) { + let now = Instant::now(); + if let Some(ts) = outage_next_attempt.get(&key) + && now < *ts + { + return; + } + + let (min_backoff_ms, max_backoff_ms) = pool.single_endpoint_outage_backoff_bounds_ms(); + pool.stats + .increment_me_single_endpoint_outage_reconnect_attempt_total(); + + let bypass_quarantine = pool.single_endpoint_outage_disable_quarantine(); + let attempt_ok = if bypass_quarantine { + pool.stats + .increment_me_single_endpoint_quarantine_bypass_total(); + match tokio::time::timeout(pool.me_one_timeout, pool.connect_one(endpoint, rng.as_ref())).await { + Ok(Ok(())) => true, + Ok(Err(e)) => { + debug!( + dc = %key.0, + family = ?key.1, + %endpoint, + error = %e, + "Single-endpoint outage reconnect failed (quarantine bypass path)" + ); + false + } + Err(_) => { + debug!( + dc = %key.0, + family = ?key.1, + %endpoint, + "Single-endpoint outage reconnect timed out (quarantine bypass path)" + ); + false + } + } + } else { + let one_endpoint = [endpoint]; + match tokio::time::timeout( + pool.me_one_timeout, + pool.connect_endpoints_round_robin(&one_endpoint, rng.as_ref()), + ) + .await + { + Ok(ok) => ok, + Err(_) => { + debug!( + dc = %key.0, + family = ?key.1, + %endpoint, + "Single-endpoint outage reconnect timed out" + ); + false + } + } + }; + + if attempt_ok { + pool.stats + .increment_me_single_endpoint_outage_reconnect_success_total(); + pool.stats.increment_me_reconnect_success(); + outage_backoff.insert(key, min_backoff_ms); + let jitter = min_backoff_ms / JITTER_FRAC_NUM; + let wait = Duration::from_millis(min_backoff_ms) + + Duration::from_millis(rand::rng().random_range(0..=jitter.max(1))); + outage_next_attempt.insert(key, now + wait); + info!( + dc = %key.0, + family = ?key.1, + %endpoint, + required, + backoff_ms = min_backoff_ms, + "Single-endpoint outage reconnect succeeded" + ); + return; + } + + pool.stats.increment_me_reconnect_attempt(); + let current_ms = *outage_backoff.get(&key).unwrap_or(&min_backoff_ms); + let next_ms = current_ms.saturating_mul(2).min(max_backoff_ms); + outage_backoff.insert(key, next_ms); + let jitter = next_ms / JITTER_FRAC_NUM; + let wait = Duration::from_millis(next_ms) + + Duration::from_millis(rand::rng().random_range(0..=jitter.max(1))); + outage_next_attempt.insert(key, now + wait); + warn!( + dc = %key.0, + family = ?key.1, + %endpoint, + required, + backoff_ms = next_ms, + "Single-endpoint outage reconnect scheduled" + ); +} + +async fn maybe_rotate_single_endpoint_shadow( + pool: &Arc, + rng: &Arc, + key: (i32, IpFamily), + dc: i32, + family: IpFamily, + endpoints: &[SocketAddr], + alive: usize, + required: usize, + live_writer_ids_by_addr: &HashMap>, + shadow_rotate_deadline: &mut HashMap<(i32, IpFamily), Instant>, +) { + if endpoints.len() != 1 || alive < required { + return; + } + + let Some(interval) = pool.single_endpoint_shadow_rotate_interval() else { + return; + }; + + let now = Instant::now(); + if let Some(deadline) = shadow_rotate_deadline.get(&key) + && now < *deadline + { + return; + } + + let endpoint = endpoints[0]; + let Some(writer_ids) = live_writer_ids_by_addr.get(&endpoint) else { + shadow_rotate_deadline.insert(key, now + Duration::from_secs(SHADOW_ROTATE_RETRY_SECS)); + return; + }; + + let mut candidate_writer_id = None; + for writer_id in writer_ids { + if pool.registry.is_writer_empty(*writer_id).await { + candidate_writer_id = Some(*writer_id); + break; + } + } + + let Some(old_writer_id) = candidate_writer_id else { + shadow_rotate_deadline.insert(key, now + Duration::from_secs(SHADOW_ROTATE_RETRY_SECS)); + debug!( + dc = %dc, + ?family, + %endpoint, + alive, + required, + "Single-endpoint shadow rotation skipped: no empty writer candidate" + ); + return; + }; + + let rotate_ok = match tokio::time::timeout(pool.me_one_timeout, pool.connect_one(endpoint, rng.as_ref())).await { + Ok(Ok(())) => true, + Ok(Err(e)) => { + debug!( + dc = %dc, + ?family, + %endpoint, + error = %e, + "Single-endpoint shadow rotation connect failed" + ); + false + } + Err(_) => { + debug!( + dc = %dc, + ?family, + %endpoint, + "Single-endpoint shadow rotation connect timed out" + ); + false + } + }; + + if !rotate_ok { + shadow_rotate_deadline.insert( + key, + now + interval.min(Duration::from_secs(SHADOW_ROTATE_RETRY_SECS)), + ); + return; + } + + pool.mark_writer_draining_with_timeout(old_writer_id, pool.force_close_timeout(), false) + .await; + pool.stats.increment_me_single_endpoint_shadow_rotate_total(); + shadow_rotate_deadline.insert(key, now + interval); + info!( + dc = %dc, + ?family, + %endpoint, + old_writer_id, + rotate_every_secs = interval.as_secs(), + "Single-endpoint shadow writer rotated" + ); +} diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index ed6969a..5a1046b 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -101,6 +101,12 @@ pub struct MePool { pub(super) me_reconnect_backoff_base: Duration, pub(super) me_reconnect_backoff_cap: Duration, pub(super) me_reconnect_fast_retry_count: u32, + pub(super) me_single_endpoint_shadow_writers: AtomicU8, + pub(super) me_single_endpoint_outage_mode_enabled: AtomicBool, + pub(super) me_single_endpoint_outage_disable_quarantine: AtomicBool, + pub(super) me_single_endpoint_outage_backoff_min_ms: AtomicU64, + pub(super) me_single_endpoint_outage_backoff_max_ms: AtomicU64, + pub(super) me_single_endpoint_shadow_rotate_every_secs: AtomicU64, pub(super) proxy_map_v4: Arc>>>, pub(super) proxy_map_v6: Arc>>>, pub(super) default_dc: AtomicI32, @@ -189,6 +195,12 @@ impl MePool { me_reconnect_backoff_base_ms: u64, me_reconnect_backoff_cap_ms: u64, me_reconnect_fast_retry_count: u32, + me_single_endpoint_shadow_writers: u8, + me_single_endpoint_outage_mode_enabled: bool, + me_single_endpoint_outage_disable_quarantine: bool, + me_single_endpoint_outage_backoff_min_ms: u64, + me_single_endpoint_outage_backoff_max_ms: u64, + me_single_endpoint_shadow_rotate_every_secs: u64, hardswap: bool, me_pool_drain_ttl_secs: u64, me_pool_force_close_secs: u64, @@ -259,6 +271,22 @@ impl MePool { me_reconnect_backoff_base: Duration::from_millis(me_reconnect_backoff_base_ms), me_reconnect_backoff_cap: Duration::from_millis(me_reconnect_backoff_cap_ms), me_reconnect_fast_retry_count, + me_single_endpoint_shadow_writers: AtomicU8::new(me_single_endpoint_shadow_writers), + me_single_endpoint_outage_mode_enabled: AtomicBool::new( + me_single_endpoint_outage_mode_enabled, + ), + me_single_endpoint_outage_disable_quarantine: AtomicBool::new( + me_single_endpoint_outage_disable_quarantine, + ), + me_single_endpoint_outage_backoff_min_ms: AtomicU64::new( + me_single_endpoint_outage_backoff_min_ms, + ), + me_single_endpoint_outage_backoff_max_ms: AtomicU64::new( + me_single_endpoint_outage_backoff_max_ms, + ), + me_single_endpoint_shadow_rotate_every_secs: AtomicU64::new( + me_single_endpoint_shadow_rotate_every_secs, + ), pool_size: 2, proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)), proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)), @@ -317,6 +345,12 @@ impl MePool { bind_stale_ttl_secs: u64, secret_atomic_snapshot: bool, deterministic_writer_sort: bool, + single_endpoint_shadow_writers: u8, + single_endpoint_outage_mode_enabled: bool, + single_endpoint_outage_disable_quarantine: bool, + single_endpoint_outage_backoff_min_ms: u64, + single_endpoint_outage_backoff_max_ms: u64, + single_endpoint_shadow_rotate_every_secs: u64, ) { self.hardswap.store(hardswap, Ordering::Relaxed); self.me_pool_drain_ttl_secs @@ -341,6 +375,18 @@ impl MePool { .store(secret_atomic_snapshot, Ordering::Relaxed); self.me_deterministic_writer_sort .store(deterministic_writer_sort, Ordering::Relaxed); + self.me_single_endpoint_shadow_writers + .store(single_endpoint_shadow_writers, Ordering::Relaxed); + self.me_single_endpoint_outage_mode_enabled + .store(single_endpoint_outage_mode_enabled, Ordering::Relaxed); + self.me_single_endpoint_outage_disable_quarantine + .store(single_endpoint_outage_disable_quarantine, Ordering::Relaxed); + self.me_single_endpoint_outage_backoff_min_ms + .store(single_endpoint_outage_backoff_min_ms, Ordering::Relaxed); + self.me_single_endpoint_outage_backoff_max_ms + .store(single_endpoint_outage_backoff_max_ms, Ordering::Relaxed); + self.me_single_endpoint_shadow_rotate_every_secs + .store(single_endpoint_shadow_rotate_every_secs, Ordering::Relaxed); } pub fn reset_stun_state(&self) { @@ -405,6 +451,54 @@ impl MePool { MeBindStaleMode::from_u8(self.me_bind_stale_mode.load(Ordering::Relaxed)) } + pub(super) fn required_writers_for_dc(&self, endpoint_count: usize) -> usize { + if endpoint_count == 0 { + return 0; + } + if endpoint_count == 1 { + let shadow = self + .me_single_endpoint_shadow_writers + .load(Ordering::Relaxed) as usize; + return (1 + shadow).max(3); + } + endpoint_count.max(3) + } + + pub(super) fn single_endpoint_outage_mode_enabled(&self) -> bool { + self.me_single_endpoint_outage_mode_enabled + .load(Ordering::Relaxed) + } + + pub(super) fn single_endpoint_outage_disable_quarantine(&self) -> bool { + self.me_single_endpoint_outage_disable_quarantine + .load(Ordering::Relaxed) + } + + pub(super) fn single_endpoint_outage_backoff_bounds_ms(&self) -> (u64, u64) { + let min_ms = self + .me_single_endpoint_outage_backoff_min_ms + .load(Ordering::Relaxed); + let max_ms = self + .me_single_endpoint_outage_backoff_max_ms + .load(Ordering::Relaxed); + if min_ms <= max_ms { + (min_ms, max_ms) + } else { + (max_ms, min_ms) + } + } + + pub(super) fn single_endpoint_shadow_rotate_interval(&self) -> Option { + let secs = self + .me_single_endpoint_shadow_rotate_every_secs + .load(Ordering::Relaxed); + if secs == 0 { + None + } else { + Some(Duration::from_secs(secs)) + } + } + pub(super) fn family_order(&self) -> Vec { let mut order = Vec::new(); if self.decision.prefer_ipv6() { diff --git a/src/transport/middle_proxy/pool_reinit.rs b/src/transport/middle_proxy/pool_reinit.rs index 33b8cc4..d5242b7 100644 --- a/src/transport/middle_proxy/pool_reinit.rs +++ b/src/transport/middle_proxy/pool_reinit.rs @@ -148,10 +148,6 @@ impl MePool { out } - pub(super) fn required_writers_for_dc(endpoint_count: usize) -> usize { - endpoint_count.max(3) - } - fn hardswap_warmup_connect_delay_ms(&self) -> u64 { let min_ms = self.me_hardswap_warmup_delay_min_ms.load(Ordering::Relaxed); let max_ms = self.me_hardswap_warmup_delay_max_ms.load(Ordering::Relaxed); @@ -221,7 +217,7 @@ impl MePool { let mut endpoint_list: Vec = endpoints.iter().copied().collect(); endpoint_list.sort_unstable(); - let required = Self::required_writers_for_dc(endpoint_list.len()); + let required = self.required_writers_for_dc(endpoint_list.len()); let mut completed = false; let mut last_fresh_count = self .fresh_writer_count_for_endpoints(generation, endpoints) @@ -409,7 +405,7 @@ impl MePool { if endpoints.is_empty() { continue; } - let required = Self::required_writers_for_dc(endpoints.len()); + let required = self.required_writers_for_dc(endpoints.len()); let fresh_count = writers .iter() .filter(|w| !w.draining.load(Ordering::Relaxed))