From 58ff0c7971fee0ded9f63ec5c52e790e6c757594 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Tue, 3 Mar 2026 03:35:47 +0300 Subject: [PATCH] Update pool.rs Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/transport/middle_proxy/pool.rs | 79 +++++++++++++++++++++++++++++- 1 file changed, 78 insertions(+), 1 deletion(-) diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 14133b4..5ae922a 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -7,7 +7,7 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use tokio::sync::{Mutex, Notify, RwLock, mpsc}; use tokio_util::sync::CancellationToken; -use crate::config::{MeBindStaleMode, MeSocksKdfPolicy}; +use crate::config::{MeBindStaleMode, MeFloorMode, MeSocksKdfPolicy}; use crate::crypto::SecureRandom; use crate::network::IpFamily; use crate::network::probe::NetworkDecision; @@ -107,6 +107,10 @@ pub struct MePool { pub(super) me_single_endpoint_outage_backoff_min_ms: AtomicU64, pub(super) me_single_endpoint_outage_backoff_max_ms: AtomicU64, pub(super) me_single_endpoint_shadow_rotate_every_secs: AtomicU64, + pub(super) me_floor_mode: AtomicU8, + pub(super) me_adaptive_floor_idle_secs: AtomicU64, + pub(super) me_adaptive_floor_min_writers_single_endpoint: AtomicU8, + pub(super) me_adaptive_floor_recover_grace_secs: AtomicU64, pub(super) proxy_map_v4: Arc>>>, pub(super) proxy_map_v6: Arc>>>, pub(super) default_dc: AtomicI32, @@ -201,6 +205,10 @@ impl MePool { me_single_endpoint_outage_backoff_min_ms: u64, me_single_endpoint_outage_backoff_max_ms: u64, me_single_endpoint_shadow_rotate_every_secs: u64, + me_floor_mode: MeFloorMode, + me_adaptive_floor_idle_secs: u64, + me_adaptive_floor_min_writers_single_endpoint: u8, + me_adaptive_floor_recover_grace_secs: u64, hardswap: bool, me_pool_drain_ttl_secs: u64, me_pool_force_close_secs: u64, @@ -287,6 +295,14 @@ impl MePool { me_single_endpoint_shadow_rotate_every_secs: AtomicU64::new( me_single_endpoint_shadow_rotate_every_secs, ), + me_floor_mode: AtomicU8::new(me_floor_mode.as_u8()), + me_adaptive_floor_idle_secs: AtomicU64::new(me_adaptive_floor_idle_secs), + me_adaptive_floor_min_writers_single_endpoint: AtomicU8::new( + me_adaptive_floor_min_writers_single_endpoint, + ), + me_adaptive_floor_recover_grace_secs: AtomicU64::new( + me_adaptive_floor_recover_grace_secs, + ), pool_size: 2, proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)), proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)), @@ -351,6 +367,10 @@ impl MePool { single_endpoint_outage_backoff_min_ms: u64, single_endpoint_outage_backoff_max_ms: u64, single_endpoint_shadow_rotate_every_secs: u64, + floor_mode: MeFloorMode, + adaptive_floor_idle_secs: u64, + adaptive_floor_min_writers_single_endpoint: u8, + adaptive_floor_recover_grace_secs: u64, ) { self.hardswap.store(hardswap, Ordering::Relaxed); self.me_pool_drain_ttl_secs @@ -387,6 +407,29 @@ impl MePool { .store(single_endpoint_outage_backoff_max_ms, Ordering::Relaxed); self.me_single_endpoint_shadow_rotate_every_secs .store(single_endpoint_shadow_rotate_every_secs, Ordering::Relaxed); + let previous_floor_mode = self.floor_mode(); + self.me_floor_mode + .store(floor_mode.as_u8(), Ordering::Relaxed); + self.me_adaptive_floor_idle_secs + .store(adaptive_floor_idle_secs, Ordering::Relaxed); + self.me_adaptive_floor_min_writers_single_endpoint + .store(adaptive_floor_min_writers_single_endpoint, Ordering::Relaxed); + self.me_adaptive_floor_recover_grace_secs + .store(adaptive_floor_recover_grace_secs, Ordering::Relaxed); + if previous_floor_mode != floor_mode { + self.stats.increment_me_floor_mode_switch_total(); + match (previous_floor_mode, floor_mode) { + (MeFloorMode::Static, MeFloorMode::Adaptive) => { + self.stats + .increment_me_floor_mode_switch_static_to_adaptive_total(); + } + (MeFloorMode::Adaptive, MeFloorMode::Static) => { + self.stats + .increment_me_floor_mode_switch_adaptive_to_static_total(); + } + _ => {} + } + } } pub fn reset_stun_state(&self) { @@ -464,6 +507,40 @@ impl MePool { endpoint_count.max(3) } + pub(super) fn floor_mode(&self) -> MeFloorMode { + MeFloorMode::from_u8(self.me_floor_mode.load(Ordering::Relaxed)) + } + + pub(super) fn adaptive_floor_idle_duration(&self) -> Duration { + Duration::from_secs(self.me_adaptive_floor_idle_secs.load(Ordering::Relaxed)) + } + + pub(super) fn adaptive_floor_recover_grace_duration(&self) -> Duration { + Duration::from_secs( + self.me_adaptive_floor_recover_grace_secs + .load(Ordering::Relaxed), + ) + } + + pub(super) fn required_writers_for_dc_with_floor_mode( + &self, + endpoint_count: usize, + reduce_for_idle: bool, + ) -> usize { + let base_required = self.required_writers_for_dc(endpoint_count); + if !reduce_for_idle { + return base_required; + } + if endpoint_count != 1 || self.floor_mode() != MeFloorMode::Adaptive { + return base_required; + } + let min_writers = (self + .me_adaptive_floor_min_writers_single_endpoint + .load(Ordering::Relaxed) as usize) + .max(1); + base_required.min(min_writers) + } + pub(super) fn single_endpoint_outage_mode_enabled(&self) -> bool { self.me_single_endpoint_outage_mode_enabled .load(Ordering::Relaxed)