From 0b78583cf58a14fb4ef1fd2e37eec064f2c0ff77 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Wed, 25 Mar 2026 18:18:06 +0300 Subject: [PATCH] ME Routing Core Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/proxy/middle_relay.rs | 4 +++ src/transport/middle_proxy/pool.rs | 32 ++++++++++++++++------- src/transport/middle_proxy/pool_writer.rs | 3 ++- 3 files changed, 29 insertions(+), 10 deletions(-) diff --git a/src/proxy/middle_relay.rs b/src/proxy/middle_relay.rs index ca32e6f..6c7aef9 100644 --- a/src/proxy/middle_relay.rs +++ b/src/proxy/middle_relay.rs @@ -60,6 +60,9 @@ static DESYNC_DEDUP_PREVIOUS: OnceLock> = OnceLock::new(); static DESYNC_HASHER: OnceLock = OnceLock::new(); static DESYNC_FULL_CACHE_LAST_EMIT_AT: OnceLock>> = OnceLock::new(); static DESYNC_DEDUP_ROTATION_STATE: OnceLock> = OnceLock::new(); +// Invariant for async callers: +// this std::sync::Mutex is allowed only because critical sections are short, +// synchronous, and MUST never cross an `.await`. static RELAY_IDLE_CANDIDATE_REGISTRY: OnceLock> = OnceLock::new(); static RELAY_IDLE_MARK_SEQ: AtomicU64 = AtomicU64::new(0); @@ -100,6 +103,7 @@ fn relay_idle_candidate_registry() -> &'static Mutex fn relay_idle_candidate_registry_lock() -> std::sync::MutexGuard<'static, RelayIdleCandidateRegistry> { + // Keep lock scope narrow and synchronous: callers must drop guard before any `.await`. let registry = relay_idle_candidate_registry(); match registry.lock() { Ok(guard) => guard, diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 07d4d19..fdda988 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -257,11 +257,17 @@ pub struct SecretSnapshot { pub secret: Vec, } -#[allow(dead_code)] -pub struct MePool { +pub struct RoutingCore { pub(super) registry: Arc, pub(super) writers: Arc, pub(super) rr: AtomicU64, + pub(super) writer_epoch: watch::Sender, + pub(super) preferred_endpoints_by_dc: ArcSwap>>, +} + +#[allow(dead_code)] +pub struct MePool { + pub(super) routing: Arc, pub(super) decision: NetworkDecision, pub(super) upstream: Option>, pub(super) rng: Arc, @@ -332,7 +338,6 @@ pub struct MePool { pub(super) nat_reflection_cache: Arc>, pub(super) nat_reflection_singleflight_v4: Arc>, pub(super) nat_reflection_singleflight_v6: Arc>, - pub(super) writer_epoch: watch::Sender, pub(super) refill_inflight: Arc>>, pub(super) refill_inflight_dc: Arc>>, pub(super) conn_count: AtomicUsize, @@ -389,7 +394,14 @@ pub struct MePool { pub(super) me_last_drain_gate_updated_at_epoch_secs: AtomicU64, pub(super) runtime_ready: AtomicBool, pool_size: usize, - pub(super) preferred_endpoints_by_dc: ArcSwap>>, +} + +impl Deref for MePool { + type Target = RoutingCore; + + fn deref(&self) -> &Self::Target { + self.routing.as_ref() + } } #[derive(Debug, Default)] @@ -524,9 +536,13 @@ impl MePool { let (writer_epoch, _) = watch::channel(0u64); let now_epoch_secs = Self::now_epoch_secs(); Arc::new(Self { - registry, - writers: Arc::new(WritersState::new()), - rr: AtomicU64::new(0), + routing: Arc::new(RoutingCore { + registry, + writers: Arc::new(WritersState::new()), + rr: AtomicU64::new(0), + writer_epoch, + preferred_endpoints_by_dc: ArcSwap::from_pointee(preferred_endpoints_by_dc), + }), decision, upstream, rng, @@ -644,7 +660,6 @@ impl MePool { nat_reflection_cache: Arc::new(Mutex::new(NatReflectionCache::default())), nat_reflection_singleflight_v4: Arc::new(Mutex::new(())), nat_reflection_singleflight_v6: Arc::new(Mutex::new(())), - writer_epoch, refill_inflight: Arc::new(Mutex::new(HashSet::new())), refill_inflight_dc: Arc::new(Mutex::new(HashSet::new())), conn_count: AtomicUsize::new(0), @@ -731,7 +746,6 @@ impl MePool { me_last_drain_gate_block_reason: AtomicU8::new(MeDrainGateReason::Open as u8), me_last_drain_gate_updated_at_epoch_secs: AtomicU64::new(now_epoch_secs), runtime_ready: AtomicBool::new(false), - preferred_endpoints_by_dc: ArcSwap::from_pointee(preferred_endpoints_by_dc), }) } diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index 506c354..d2d7420 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -596,7 +596,8 @@ impl MePool { let _ = self.registry.writer_lost(writer_id).await; self.rtt_stats.lock().await.remove(&writer_id); if let Some(tx) = close_tx { - let _ = tx.send(WriterCommand::Close).await; + // Keep teardown critical path non-blocking: close is best-effort only. + let _ = tx.try_send(WriterCommand::Close); } if let Some(addr) = removed_addr { if let Some(uptime) = removed_uptime {