diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 544e328..d92ae78 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -9,6 +9,9 @@ const DEFAULT_MIDDLE_PROXY_WARM_STANDBY: usize = 16; const DEFAULT_ME_RECONNECT_MAX_CONCURRENT_PER_DC: u32 = 8; const DEFAULT_ME_RECONNECT_FAST_RETRY_COUNT: u32 = 16; const DEFAULT_ME_SINGLE_ENDPOINT_SHADOW_WRITERS: u8 = 2; +const DEFAULT_ME_ADAPTIVE_FLOOR_IDLE_SECS: u64 = 90; +const DEFAULT_ME_ADAPTIVE_FLOOR_MIN_WRITERS_SINGLE_ENDPOINT: u8 = 1; +const DEFAULT_ME_ADAPTIVE_FLOOR_RECOVER_GRACE_SECS: u64 = 180; const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 3; const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 4; const DEFAULT_LISTEN_ADDR_IPV6: &str = "::"; @@ -185,6 +188,18 @@ pub(crate) fn default_me_single_endpoint_shadow_rotate_every_secs() -> u64 { 900 } +pub(crate) fn default_me_adaptive_floor_idle_secs() -> u64 { + DEFAULT_ME_ADAPTIVE_FLOOR_IDLE_SECS +} + +pub(crate) fn default_me_adaptive_floor_min_writers_single_endpoint() -> u8 { + DEFAULT_ME_ADAPTIVE_FLOOR_MIN_WRITERS_SINGLE_ENDPOINT +} + +pub(crate) fn default_me_adaptive_floor_recover_grace_secs() -> u64 { + DEFAULT_ME_ADAPTIVE_FLOOR_RECOVER_GRACE_SECS +} + pub(crate) fn default_upstream_connect_retry_attempts() -> u32 { DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS } diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index e16cff2..caec078 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -32,7 +32,7 @@ use notify::{EventKind, RecursiveMode, Watcher, recommended_watcher}; use tokio::sync::{mpsc, watch}; use tracing::{error, info, warn}; -use crate::config::{LogLevel, MeSocksKdfPolicy, MeTelemetryLevel}; +use crate::config::{LogLevel, MeFloorMode, MeSocksKdfPolicy, MeTelemetryLevel}; use super::load::ProxyConfig; // ── Hot fields ──────────────────────────────────────────────────────────────── @@ -58,6 +58,10 @@ pub struct HotFields { pub telemetry_user_enabled: bool, pub telemetry_me_level: MeTelemetryLevel, pub me_socks_kdf_policy: MeSocksKdfPolicy, + pub me_floor_mode: MeFloorMode, + pub me_adaptive_floor_idle_secs: u64, + pub me_adaptive_floor_min_writers_single_endpoint: u8, + pub me_adaptive_floor_recover_grace_secs: u64, pub me_route_backpressure_base_timeout_ms: u64, pub me_route_backpressure_high_timeout_ms: u64, pub me_route_backpressure_high_watermark_pct: u8, @@ -85,6 +89,14 @@ impl HotFields { telemetry_user_enabled: cfg.general.telemetry.user_enabled, telemetry_me_level: cfg.general.telemetry.me_level, me_socks_kdf_policy: cfg.general.me_socks_kdf_policy, + me_floor_mode: cfg.general.me_floor_mode, + me_adaptive_floor_idle_secs: cfg.general.me_adaptive_floor_idle_secs, + me_adaptive_floor_min_writers_single_endpoint: cfg + .general + .me_adaptive_floor_min_writers_single_endpoint, + me_adaptive_floor_recover_grace_secs: cfg + .general + .me_adaptive_floor_recover_grace_secs, me_route_backpressure_base_timeout_ms: cfg.general.me_route_backpressure_base_timeout_ms, me_route_backpressure_high_timeout_ms: cfg.general.me_route_backpressure_high_timeout_ms, me_route_backpressure_high_watermark_pct: cfg.general.me_route_backpressure_high_watermark_pct, @@ -309,6 +321,22 @@ fn log_changes( ); } + if old_hot.me_floor_mode != new_hot.me_floor_mode + || old_hot.me_adaptive_floor_idle_secs != new_hot.me_adaptive_floor_idle_secs + || old_hot.me_adaptive_floor_min_writers_single_endpoint + != new_hot.me_adaptive_floor_min_writers_single_endpoint + || old_hot.me_adaptive_floor_recover_grace_secs + != new_hot.me_adaptive_floor_recover_grace_secs + { + info!( + "config reload: me_floor: mode={:?} idle={}s min_single={} recover_grace={}s", + new_hot.me_floor_mode, + new_hot.me_adaptive_floor_idle_secs, + new_hot.me_adaptive_floor_min_writers_single_endpoint, + new_hot.me_adaptive_floor_recover_grace_secs, + ); + } + if old_hot.me_route_backpressure_base_timeout_ms != new_hot.me_route_backpressure_base_timeout_ms || old_hot.me_route_backpressure_high_timeout_ms diff --git a/src/config/load.rs b/src/config/load.rs index a027d58..e549b55 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -261,6 +261,15 @@ impl ProxyConfig { )); } + if config.general.me_adaptive_floor_min_writers_single_endpoint == 0 + || config.general.me_adaptive_floor_min_writers_single_endpoint > 32 + { + return Err(ProxyError::Config( + "general.me_adaptive_floor_min_writers_single_endpoint must be within [1, 32]" + .to_string(), + )); + } + if config.general.me_single_endpoint_outage_backoff_min_ms == 0 { return Err(ProxyError::Config( "general.me_single_endpoint_outage_backoff_min_ms must be > 0".to_string(), @@ -642,6 +651,19 @@ mod tests { cfg.general.me_single_endpoint_shadow_rotate_every_secs, default_me_single_endpoint_shadow_rotate_every_secs() ); + assert_eq!(cfg.general.me_floor_mode, MeFloorMode::default()); + assert_eq!( + cfg.general.me_adaptive_floor_idle_secs, + default_me_adaptive_floor_idle_secs() + ); + assert_eq!( + cfg.general.me_adaptive_floor_min_writers_single_endpoint, + default_me_adaptive_floor_min_writers_single_endpoint() + ); + assert_eq!( + cfg.general.me_adaptive_floor_recover_grace_secs, + default_me_adaptive_floor_recover_grace_secs() + ); assert_eq!( cfg.general.upstream_connect_retry_attempts, default_upstream_connect_retry_attempts() @@ -704,6 +726,19 @@ mod tests { general.me_single_endpoint_shadow_rotate_every_secs, default_me_single_endpoint_shadow_rotate_every_secs() ); + assert_eq!(general.me_floor_mode, MeFloorMode::default()); + assert_eq!( + general.me_adaptive_floor_idle_secs, + default_me_adaptive_floor_idle_secs() + ); + assert_eq!( + general.me_adaptive_floor_min_writers_single_endpoint, + default_me_adaptive_floor_min_writers_single_endpoint() + ); + assert_eq!( + general.me_adaptive_floor_recover_grace_secs, + default_me_adaptive_floor_recover_grace_secs() + ); assert_eq!( general.upstream_connect_retry_attempts, default_upstream_connect_retry_attempts() @@ -931,6 +966,50 @@ mod tests { let _ = std::fs::remove_file(path); } + #[test] + fn me_adaptive_floor_min_writers_out_of_range_is_rejected() { + let toml = r#" + [general] + me_adaptive_floor_min_writers_single_endpoint = 0 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_me_adaptive_floor_min_writers_out_of_range_test.toml"); + std::fs::write(&path, toml).unwrap(); + let err = ProxyConfig::load(&path).unwrap_err().to_string(); + assert!( + err.contains( + "general.me_adaptive_floor_min_writers_single_endpoint must be within [1, 32]" + ) + ); + let _ = std::fs::remove_file(path); + } + + #[test] + fn me_floor_mode_adaptive_is_parsed() { + let toml = r#" + [general] + me_floor_mode = "adaptive" + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_me_floor_mode_adaptive_test.toml"); + std::fs::write(&path, toml).unwrap(); + let cfg = ProxyConfig::load(&path).unwrap(); + assert_eq!(cfg.general.me_floor_mode, MeFloorMode::Adaptive); + let _ = std::fs::remove_file(path); + } + #[test] fn upstream_connect_retry_attempts_zero_is_rejected() { let toml = r#" diff --git a/src/config/types.rs b/src/config/types.rs index 324abf2..0d255ba 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -158,6 +158,31 @@ impl MeBindStaleMode { } } +/// Middle-End writer floor policy mode. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] +#[serde(rename_all = "lowercase")] +pub enum MeFloorMode { + #[default] + Static, + Adaptive, +} + +impl MeFloorMode { + pub fn as_u8(self) -> u8 { + match self { + MeFloorMode::Static => 0, + MeFloorMode::Adaptive => 1, + } + } + + pub fn from_u8(raw: u8) -> Self { + match raw { + 1 => MeFloorMode::Adaptive, + _ => MeFloorMode::Static, + } + } +} + /// Telemetry controls for hot-path counters and ME diagnostics. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct TelemetryConfig { @@ -419,6 +444,22 @@ pub struct GeneralConfig { #[serde(default = "default_me_single_endpoint_shadow_rotate_every_secs")] pub me_single_endpoint_shadow_rotate_every_secs: u64, + /// Floor policy mode for ME writer targets. + #[serde(default)] + pub me_floor_mode: MeFloorMode, + + /// Idle time in seconds before adaptive floor can reduce single-endpoint writer target. + #[serde(default = "default_me_adaptive_floor_idle_secs")] + pub me_adaptive_floor_idle_secs: u64, + + /// Minimum writer target for single-endpoint DC groups in adaptive floor mode. + #[serde(default = "default_me_adaptive_floor_min_writers_single_endpoint")] + pub me_adaptive_floor_min_writers_single_endpoint: u8, + + /// Grace period in seconds to hold static floor after activity in adaptive mode. + #[serde(default = "default_me_adaptive_floor_recover_grace_secs")] + pub me_adaptive_floor_recover_grace_secs: u64, + /// Connect attempts for the selected upstream before returning error/fallback. #[serde(default = "default_upstream_connect_retry_attempts")] pub upstream_connect_retry_attempts: u32, @@ -634,6 +675,10 @@ impl Default for GeneralConfig { me_single_endpoint_outage_backoff_min_ms: default_me_single_endpoint_outage_backoff_min_ms(), me_single_endpoint_outage_backoff_max_ms: default_me_single_endpoint_outage_backoff_max_ms(), me_single_endpoint_shadow_rotate_every_secs: default_me_single_endpoint_shadow_rotate_every_secs(), + me_floor_mode: MeFloorMode::default(), + me_adaptive_floor_idle_secs: default_me_adaptive_floor_idle_secs(), + me_adaptive_floor_min_writers_single_endpoint: default_me_adaptive_floor_min_writers_single_endpoint(), + me_adaptive_floor_recover_grace_secs: default_me_adaptive_floor_recover_grace_secs(), upstream_connect_retry_attempts: default_upstream_connect_retry_attempts(), upstream_connect_retry_backoff_ms: default_upstream_connect_retry_backoff_ms(), upstream_unhealthy_fail_threshold: default_upstream_unhealthy_fail_threshold(), diff --git a/src/main.rs b/src/main.rs index 4ff79fc..b890233 100644 --- a/src/main.rs +++ b/src/main.rs @@ -544,6 +544,10 @@ async fn main() -> std::result::Result<(), Box> { config.general.me_single_endpoint_outage_backoff_min_ms, config.general.me_single_endpoint_outage_backoff_max_ms, config.general.me_single_endpoint_shadow_rotate_every_secs, + config.general.me_floor_mode, + config.general.me_adaptive_floor_idle_secs, + config.general.me_adaptive_floor_min_writers_single_endpoint, + config.general.me_adaptive_floor_recover_grace_secs, config.general.hardswap, config.general.me_pool_drain_ttl_secs, config.general.effective_me_pool_force_close_secs(), diff --git a/src/metrics.rs b/src/metrics.rs index 4c03dc2..f8a6716 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -620,6 +620,64 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp } ); + let _ = writeln!( + out, + "# HELP telemt_me_floor_mode Runtime ME writer floor policy mode" + ); + let _ = writeln!(out, "# TYPE telemt_me_floor_mode gauge"); + let floor_mode = config.general.me_floor_mode; + let _ = writeln!( + out, + "telemt_me_floor_mode{{mode=\"static\"}} {}", + if matches!(floor_mode, crate::config::MeFloorMode::Static) { + 1 + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_floor_mode{{mode=\"adaptive\"}} {}", + if matches!(floor_mode, crate::config::MeFloorMode::Adaptive) { + 1 + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_floor_mode_switch_all_total Runtime ME floor mode switches" + ); + let _ = writeln!(out, "# TYPE telemt_me_floor_mode_switch_all_total counter"); + let _ = writeln!( + out, + "telemt_me_floor_mode_switch_all_total {}", + if me_allows_normal { + stats.get_me_floor_mode_switch_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_floor_mode_switch_total{{from=\"static\",to=\"adaptive\"}} {}", + if me_allows_normal { + stats.get_me_floor_mode_switch_static_to_adaptive_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_floor_mode_switch_total{{from=\"adaptive\",to=\"static\"}} {}", + if me_allows_normal { + stats.get_me_floor_mode_switch_adaptive_to_static_total() + } else { + 0 + } + ); + let _ = writeln!(out, "# HELP telemt_secure_padding_invalid_total Invalid secure frame lengths"); let _ = writeln!(out, "# TYPE telemt_secure_padding_invalid_total counter"); let _ = writeln!( diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 57b732d..8152599 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -48,6 +48,9 @@ pub struct Stats { me_single_endpoint_quarantine_bypass_total: AtomicU64, me_single_endpoint_shadow_rotate_total: AtomicU64, me_single_endpoint_shadow_rotate_skipped_quarantine_total: AtomicU64, + me_floor_mode_switch_total: AtomicU64, + me_floor_mode_switch_static_to_adaptive_total: AtomicU64, + me_floor_mode_switch_adaptive_to_static_total: AtomicU64, me_handshake_error_codes: DashMap, me_route_drop_no_conn: AtomicU64, me_route_drop_channel_closed: AtomicU64, @@ -439,6 +442,24 @@ impl Stats { .fetch_add(1, Ordering::Relaxed); } } + pub fn increment_me_floor_mode_switch_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_floor_mode_switch_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_floor_mode_switch_static_to_adaptive_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_floor_mode_switch_static_to_adaptive_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_floor_mode_switch_adaptive_to_static_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_floor_mode_switch_adaptive_to_static_total + .fetch_add(1, Ordering::Relaxed); + } + } pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) } pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) } pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) } @@ -500,6 +521,17 @@ impl Stats { self.me_single_endpoint_shadow_rotate_skipped_quarantine_total .load(Ordering::Relaxed) } + pub fn get_me_floor_mode_switch_total(&self) -> u64 { + self.me_floor_mode_switch_total.load(Ordering::Relaxed) + } + pub fn get_me_floor_mode_switch_static_to_adaptive_total(&self) -> u64 { + self.me_floor_mode_switch_static_to_adaptive_total + .load(Ordering::Relaxed) + } + pub fn get_me_floor_mode_switch_adaptive_to_static_total(&self) -> u64 { + self.me_floor_mode_switch_adaptive_to_static_total + .load(Ordering::Relaxed) + } pub fn get_me_handshake_error_code_counts(&self) -> Vec<(i32, u64)> { let mut out: Vec<(i32, u64)> = self .me_handshake_error_codes diff --git a/src/transport/middle_proxy/config_updater.rs b/src/transport/middle_proxy/config_updater.rs index 6da14fa..a9c50ab 100644 --- a/src/transport/middle_proxy/config_updater.rs +++ b/src/transport/middle_proxy/config_updater.rs @@ -282,6 +282,10 @@ async fn run_update_cycle( cfg.general.me_single_endpoint_outage_backoff_min_ms, cfg.general.me_single_endpoint_outage_backoff_max_ms, cfg.general.me_single_endpoint_shadow_rotate_every_secs, + cfg.general.me_floor_mode, + cfg.general.me_adaptive_floor_idle_secs, + cfg.general.me_adaptive_floor_min_writers_single_endpoint, + cfg.general.me_adaptive_floor_recover_grace_secs, ); let required_cfg_snapshots = cfg.general.me_config_stable_snapshots.max(1); @@ -490,6 +494,10 @@ pub async fn me_config_updater( cfg.general.me_single_endpoint_outage_backoff_min_ms, cfg.general.me_single_endpoint_outage_backoff_max_ms, cfg.general.me_single_endpoint_shadow_rotate_every_secs, + cfg.general.me_floor_mode, + cfg.general.me_adaptive_floor_idle_secs, + cfg.general.me_adaptive_floor_min_writers_single_endpoint, + cfg.general.me_adaptive_floor_recover_grace_secs, ); let new_secs = cfg.general.effective_update_every_secs().max(1); if new_secs == update_every_secs { diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 192bf1b..55d8409 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -7,6 +7,7 @@ use std::time::{Duration, Instant}; use rand::Rng; use tracing::{debug, info, warn}; +use crate::config::MeFloorMode; use crate::crypto::SecureRandom; use crate::network::IpFamily; @@ -26,6 +27,8 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c let mut outage_next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut single_endpoint_outage: HashSet<(i32, IpFamily)> = HashSet::new(); let mut shadow_rotate_deadline: HashMap<(i32, IpFamily), Instant> = HashMap::new(); + let mut adaptive_idle_since: HashMap<(i32, IpFamily), Instant> = HashMap::new(); + let mut adaptive_recover_until: HashMap<(i32, IpFamily), Instant> = HashMap::new(); loop { tokio::time::sleep(Duration::from_secs(HEALTH_INTERVAL_SECS)).await; pool.prune_closed_writers().await; @@ -40,6 +43,8 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &mut outage_next_attempt, &mut single_endpoint_outage, &mut shadow_rotate_deadline, + &mut adaptive_idle_since, + &mut adaptive_recover_until, ) .await; check_family( @@ -53,6 +58,8 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &mut outage_next_attempt, &mut single_endpoint_outage, &mut shadow_rotate_deadline, + &mut adaptive_idle_since, + &mut adaptive_recover_until, ) .await; } @@ -69,6 +76,8 @@ async fn check_family( outage_next_attempt: &mut HashMap<(i32, IpFamily), Instant>, single_endpoint_outage: &mut HashSet<(i32, IpFamily)>, shadow_rotate_deadline: &mut HashMap<(i32, IpFamily), Instant>, + adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>, + adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>, ) { let enabled = match family { IpFamily::V4 => pool.decision.ipv4_me, @@ -95,6 +104,11 @@ async fn check_family( endpoints.dedup(); } + if pool.floor_mode() == MeFloorMode::Static { + adaptive_idle_since.clear(); + adaptive_recover_until.clear(); + } + let mut live_addr_counts = HashMap::::new(); let mut live_writer_ids_by_addr = HashMap::>::new(); for writer in pool.writers.read().await.iter().filter(|w| { @@ -111,12 +125,21 @@ async fn check_family( if endpoints.is_empty() { continue; } - let required = pool.required_writers_for_dc(endpoints.len()); + let key = (dc, family); + let reduce_for_idle = should_reduce_floor_for_idle( + pool, + key, + &endpoints, + &live_writer_ids_by_addr, + adaptive_idle_since, + adaptive_recover_until, + ) + .await; + let required = pool.required_writers_for_dc_with_floor_mode(endpoints.len(), reduce_for_idle); let alive = endpoints .iter() .map(|addr| *live_addr_counts.get(addr).unwrap_or(&0)) .sum::(); - let key = (dc, family); if endpoints.len() == 1 && pool.single_endpoint_outage_mode_enabled() && alive == 0 { if single_endpoint_outage.insert(key) { @@ -148,6 +171,8 @@ async fn check_family( outage_backoff.remove(&key); outage_next_attempt.remove(&key); shadow_rotate_deadline.remove(&key); + adaptive_idle_since.remove(&key); + adaptive_recover_until.remove(&key); info!( dc = %dc, ?family, @@ -262,6 +287,54 @@ async fn check_family( } } +async fn should_reduce_floor_for_idle( + pool: &Arc, + key: (i32, IpFamily), + endpoints: &[SocketAddr], + live_writer_ids_by_addr: &HashMap>, + adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>, + adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>, +) -> bool { + if endpoints.len() != 1 || pool.floor_mode() != MeFloorMode::Adaptive { + adaptive_idle_since.remove(&key); + adaptive_recover_until.remove(&key); + return false; + } + + let now = Instant::now(); + let endpoint = endpoints[0]; + let writer_ids = live_writer_ids_by_addr + .get(&endpoint) + .map(Vec::as_slice) + .unwrap_or(&[]); + let has_bound_clients = has_bound_clients_on_endpoint(pool, writer_ids).await; + if has_bound_clients { + adaptive_idle_since.remove(&key); + adaptive_recover_until.insert(key, now + pool.adaptive_floor_recover_grace_duration()); + return false; + } + + if let Some(recover_until) = adaptive_recover_until.get(&key) + && now < *recover_until + { + adaptive_idle_since.remove(&key); + return false; + } + adaptive_recover_until.remove(&key); + + let idle_since = adaptive_idle_since.entry(key).or_insert(now); + now.saturating_duration_since(*idle_since) >= pool.adaptive_floor_idle_duration() +} + +async fn has_bound_clients_on_endpoint(pool: &Arc, writer_ids: &[u64]) -> bool { + for writer_id in writer_ids { + if !pool.registry.is_writer_empty(*writer_id).await { + return true; + } + } + false +} + async fn recover_single_endpoint_outage( pool: &Arc, rng: &Arc, diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 14133b4..5ae922a 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -7,7 +7,7 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use tokio::sync::{Mutex, Notify, RwLock, mpsc}; use tokio_util::sync::CancellationToken; -use crate::config::{MeBindStaleMode, MeSocksKdfPolicy}; +use crate::config::{MeBindStaleMode, MeFloorMode, MeSocksKdfPolicy}; use crate::crypto::SecureRandom; use crate::network::IpFamily; use crate::network::probe::NetworkDecision; @@ -107,6 +107,10 @@ pub struct MePool { pub(super) me_single_endpoint_outage_backoff_min_ms: AtomicU64, pub(super) me_single_endpoint_outage_backoff_max_ms: AtomicU64, pub(super) me_single_endpoint_shadow_rotate_every_secs: AtomicU64, + pub(super) me_floor_mode: AtomicU8, + pub(super) me_adaptive_floor_idle_secs: AtomicU64, + pub(super) me_adaptive_floor_min_writers_single_endpoint: AtomicU8, + pub(super) me_adaptive_floor_recover_grace_secs: AtomicU64, pub(super) proxy_map_v4: Arc>>>, pub(super) proxy_map_v6: Arc>>>, pub(super) default_dc: AtomicI32, @@ -201,6 +205,10 @@ impl MePool { me_single_endpoint_outage_backoff_min_ms: u64, me_single_endpoint_outage_backoff_max_ms: u64, me_single_endpoint_shadow_rotate_every_secs: u64, + me_floor_mode: MeFloorMode, + me_adaptive_floor_idle_secs: u64, + me_adaptive_floor_min_writers_single_endpoint: u8, + me_adaptive_floor_recover_grace_secs: u64, hardswap: bool, me_pool_drain_ttl_secs: u64, me_pool_force_close_secs: u64, @@ -287,6 +295,14 @@ impl MePool { me_single_endpoint_shadow_rotate_every_secs: AtomicU64::new( me_single_endpoint_shadow_rotate_every_secs, ), + me_floor_mode: AtomicU8::new(me_floor_mode.as_u8()), + me_adaptive_floor_idle_secs: AtomicU64::new(me_adaptive_floor_idle_secs), + me_adaptive_floor_min_writers_single_endpoint: AtomicU8::new( + me_adaptive_floor_min_writers_single_endpoint, + ), + me_adaptive_floor_recover_grace_secs: AtomicU64::new( + me_adaptive_floor_recover_grace_secs, + ), pool_size: 2, proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)), proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)), @@ -351,6 +367,10 @@ impl MePool { single_endpoint_outage_backoff_min_ms: u64, single_endpoint_outage_backoff_max_ms: u64, single_endpoint_shadow_rotate_every_secs: u64, + floor_mode: MeFloorMode, + adaptive_floor_idle_secs: u64, + adaptive_floor_min_writers_single_endpoint: u8, + adaptive_floor_recover_grace_secs: u64, ) { self.hardswap.store(hardswap, Ordering::Relaxed); self.me_pool_drain_ttl_secs @@ -387,6 +407,29 @@ impl MePool { .store(single_endpoint_outage_backoff_max_ms, Ordering::Relaxed); self.me_single_endpoint_shadow_rotate_every_secs .store(single_endpoint_shadow_rotate_every_secs, Ordering::Relaxed); + let previous_floor_mode = self.floor_mode(); + self.me_floor_mode + .store(floor_mode.as_u8(), Ordering::Relaxed); + self.me_adaptive_floor_idle_secs + .store(adaptive_floor_idle_secs, Ordering::Relaxed); + self.me_adaptive_floor_min_writers_single_endpoint + .store(adaptive_floor_min_writers_single_endpoint, Ordering::Relaxed); + self.me_adaptive_floor_recover_grace_secs + .store(adaptive_floor_recover_grace_secs, Ordering::Relaxed); + if previous_floor_mode != floor_mode { + self.stats.increment_me_floor_mode_switch_total(); + match (previous_floor_mode, floor_mode) { + (MeFloorMode::Static, MeFloorMode::Adaptive) => { + self.stats + .increment_me_floor_mode_switch_static_to_adaptive_total(); + } + (MeFloorMode::Adaptive, MeFloorMode::Static) => { + self.stats + .increment_me_floor_mode_switch_adaptive_to_static_total(); + } + _ => {} + } + } } pub fn reset_stun_state(&self) { @@ -464,6 +507,40 @@ impl MePool { endpoint_count.max(3) } + pub(super) fn floor_mode(&self) -> MeFloorMode { + MeFloorMode::from_u8(self.me_floor_mode.load(Ordering::Relaxed)) + } + + pub(super) fn adaptive_floor_idle_duration(&self) -> Duration { + Duration::from_secs(self.me_adaptive_floor_idle_secs.load(Ordering::Relaxed)) + } + + pub(super) fn adaptive_floor_recover_grace_duration(&self) -> Duration { + Duration::from_secs( + self.me_adaptive_floor_recover_grace_secs + .load(Ordering::Relaxed), + ) + } + + pub(super) fn required_writers_for_dc_with_floor_mode( + &self, + endpoint_count: usize, + reduce_for_idle: bool, + ) -> usize { + let base_required = self.required_writers_for_dc(endpoint_count); + if !reduce_for_idle { + return base_required; + } + if endpoint_count != 1 || self.floor_mode() != MeFloorMode::Adaptive { + return base_required; + } + let min_writers = (self + .me_adaptive_floor_min_writers_single_endpoint + .load(Ordering::Relaxed) as usize) + .max(1); + base_required.min(min_writers) + } + pub(super) fn single_endpoint_outage_mode_enabled(&self) -> bool { self.me_single_endpoint_outage_mode_enabled .load(Ordering::Relaxed)