From dc8951eae845afc4a620d6ef6767362c0b38c4ef Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 22 May 2026 20:19:09 +0300 Subject: [PATCH] Reduce MR + ME Routing hot-path contention --- src/proxy/middle_relay/idle.rs | 196 ++++++++---------- src/proxy/shared_state.rs | 4 +- ...lay_idle_registry_poison_security_tests.rs | 66 +++--- src/transport/middle_proxy/registry.rs | 14 +- src/transport/middle_proxy/registry/writer.rs | 192 ++++++++++------- src/transport/middle_proxy/send.rs | 49 ++--- src/transport/middle_proxy/send/recovery.rs | 48 ++--- src/transport/middle_proxy/send/selection.rs | 7 +- 8 files changed, 293 insertions(+), 283 deletions(-) diff --git a/src/proxy/middle_relay/idle.rs b/src/proxy/middle_relay/idle.rs index 3a33869..0653fc5 100644 --- a/src/proxy/middle_relay/idle.rs +++ b/src/proxy/middle_relay/idle.rs @@ -1,4 +1,5 @@ use super::*; +use dashmap::DashMap; mod read; @@ -10,10 +11,10 @@ pub(crate) use self::read::{ #[derive(Default)] pub(crate) struct RelayIdleCandidateRegistry { - pub(in crate::proxy::middle_relay) by_conn_id: HashMap, - pub(in crate::proxy::middle_relay) ordered: BTreeSet<(u64, u64)>, - pressure_event_seq: u64, - pressure_consumed_seq: u64, + pub(in crate::proxy::middle_relay) by_conn_id: DashMap, + pub(in crate::proxy::middle_relay) ordered: parking_lot::Mutex>, + pressure_event_seq: AtomicU64, + pressure_consumed_seq: AtomicU64, } /// Queue metadata used to preserve FIFO ordering for idle relay eviction. @@ -23,25 +24,10 @@ pub(in crate::proxy::middle_relay) struct RelayIdleCandidateMeta { pub(in crate::proxy::middle_relay) mark_pressure_seq: u64, } -pub(super) fn relay_idle_candidate_registry_lock_in( - shared: &ProxySharedState, -) -> std::sync::MutexGuard<'_, RelayIdleCandidateRegistry> { - let registry = &shared.middle_relay.relay_idle_registry; - match registry.lock() { - Ok(guard) => guard, - Err(poisoned) => { - let mut guard = poisoned.into_inner(); - *guard = RelayIdleCandidateRegistry::default(); - registry.clear_poison(); - guard - } - } -} - pub(super) fn mark_relay_idle_candidate_in(shared: &ProxySharedState, conn_id: u64) -> bool { - let mut guard = relay_idle_candidate_registry_lock_in(shared); + let registry = &shared.middle_relay.relay_idle_registry; - if guard.by_conn_id.contains_key(&conn_id) { + if registry.by_conn_id.contains_key(&conn_id) { return false; } @@ -52,24 +38,35 @@ pub(super) fn mark_relay_idle_candidate_in(shared: &ProxySharedState, conn_id: u .saturating_add(1); let meta = RelayIdleCandidateMeta { mark_order_seq, - mark_pressure_seq: guard.pressure_event_seq, + mark_pressure_seq: registry.pressure_event_seq.load(Ordering::Relaxed), }; - guard.by_conn_id.insert(conn_id, meta); - guard.ordered.insert((meta.mark_order_seq, conn_id)); - true + match registry.by_conn_id.entry(conn_id) { + dashmap::mapref::entry::Entry::Occupied(_) => false, + dashmap::mapref::entry::Entry::Vacant(entry) => { + entry.insert(meta); + registry.ordered.lock().insert((meta.mark_order_seq, conn_id)); + true + } + } } pub(super) fn clear_relay_idle_candidate_in(shared: &ProxySharedState, conn_id: u64) { - let mut guard = relay_idle_candidate_registry_lock_in(shared); + let registry = &shared.middle_relay.relay_idle_registry; - if let Some(meta) = guard.by_conn_id.remove(&conn_id) { - guard.ordered.remove(&(meta.mark_order_seq, conn_id)); + if let Some((_, meta)) = registry.by_conn_id.remove(&conn_id) { + registry + .ordered + .lock() + .remove(&(meta.mark_order_seq, conn_id)); } } pub(super) fn note_relay_pressure_event_in(shared: &ProxySharedState) { - let mut guard = relay_idle_candidate_registry_lock_in(shared); - guard.pressure_event_seq = guard.pressure_event_seq.wrapping_add(1); + shared + .middle_relay + .relay_idle_registry + .pressure_event_seq + .fetch_add(1, Ordering::Relaxed); } pub(crate) fn note_global_relay_pressure(shared: &ProxySharedState) { @@ -77,8 +74,11 @@ pub(crate) fn note_global_relay_pressure(shared: &ProxySharedState) { } pub(super) fn relay_pressure_event_seq_in(shared: &ProxySharedState) -> u64 { - let guard = relay_idle_candidate_registry_lock_in(shared); - guard.pressure_event_seq + shared + .middle_relay + .relay_idle_registry + .pressure_event_seq + .load(Ordering::Relaxed) } pub(super) fn maybe_evict_idle_candidate_on_pressure_in( @@ -87,33 +87,43 @@ pub(super) fn maybe_evict_idle_candidate_on_pressure_in( seen_pressure_seq: &mut u64, stats: &Stats, ) -> bool { - let mut guard = relay_idle_candidate_registry_lock_in(shared); + let registry = &shared.middle_relay.relay_idle_registry; - let latest_pressure_seq = guard.pressure_event_seq; + let latest_pressure_seq = registry.pressure_event_seq.load(Ordering::Relaxed); if latest_pressure_seq == *seen_pressure_seq { return false; } *seen_pressure_seq = latest_pressure_seq; - if latest_pressure_seq == guard.pressure_consumed_seq { + if latest_pressure_seq == registry.pressure_consumed_seq.load(Ordering::Relaxed) { return false; } - if guard.ordered.is_empty() { - guard.pressure_consumed_seq = latest_pressure_seq; - return false; - } - - let oldest = guard - .ordered - .iter() - .next() - .map(|(_, candidate_conn_id)| *candidate_conn_id); + let oldest = { + let mut ordered = registry.ordered.lock(); + loop { + let Some((mark_order_seq, candidate_conn_id)) = ordered.iter().next().copied() else { + registry + .pressure_consumed_seq + .store(latest_pressure_seq, Ordering::Relaxed); + return false; + }; + let Some(candidate_meta) = registry.by_conn_id.get(&candidate_conn_id) else { + ordered.remove(&(mark_order_seq, candidate_conn_id)); + continue; + }; + if candidate_meta.mark_order_seq != mark_order_seq { + ordered.remove(&(mark_order_seq, candidate_conn_id)); + continue; + } + break Some(candidate_conn_id); + } + }; if oldest != Some(conn_id) { return false; } - let Some(candidate_meta) = guard.by_conn_id.get(&conn_id).copied() else { + let Some(candidate_meta) = registry.by_conn_id.get(&conn_id).map(|entry| *entry.value()) else { return false; }; @@ -121,10 +131,15 @@ pub(super) fn maybe_evict_idle_candidate_on_pressure_in( return false; } - if let Some(meta) = guard.by_conn_id.remove(&conn_id) { - guard.ordered.remove(&(meta.mark_order_seq, conn_id)); + if let Some((_, meta)) = registry.by_conn_id.remove(&conn_id) { + registry + .ordered + .lock() + .remove(&(meta.mark_order_seq, conn_id)); } - guard.pressure_consumed_seq = latest_pressure_seq; + registry + .pressure_consumed_seq + .store(latest_pressure_seq, Ordering::Relaxed); stats.increment_relay_pressure_evict_total(); true } @@ -220,72 +235,32 @@ pub(crate) fn mark_relay_idle_candidate_for_testing( shared: &ProxySharedState, conn_id: u64, ) -> bool { - let registry = &shared.middle_relay.relay_idle_registry; - let mut guard = match registry.lock() { - Ok(guard) => guard, - Err(poisoned) => { - let mut guard = poisoned.into_inner(); - *guard = RelayIdleCandidateRegistry::default(); - registry.clear_poison(); - guard - } - }; - - if guard.by_conn_id.contains_key(&conn_id) { - return false; - } - - let mark_order_seq = shared - .middle_relay - .relay_idle_mark_seq - .fetch_add(1, Ordering::Relaxed); - let mark_pressure_seq = guard.pressure_event_seq; - let meta = RelayIdleCandidateMeta { - mark_order_seq, - mark_pressure_seq, - }; - guard.by_conn_id.insert(conn_id, meta); - guard.ordered.insert((mark_order_seq, conn_id)); - true + mark_relay_idle_candidate_in(shared, conn_id) } #[cfg(test)] pub(crate) fn oldest_relay_idle_candidate_for_testing(shared: &ProxySharedState) -> Option { let registry = &shared.middle_relay.relay_idle_registry; - let guard = match registry.lock() { - Ok(guard) => guard, - Err(poisoned) => { - let mut guard = poisoned.into_inner(); - *guard = RelayIdleCandidateRegistry::default(); - registry.clear_poison(); - guard - } - }; - guard.ordered.iter().next().map(|(_, conn_id)| *conn_id) + registry + .ordered + .lock() + .iter() + .next() + .map(|(_, conn_id)| *conn_id) } #[cfg(test)] pub(crate) fn clear_relay_idle_candidate_for_testing(shared: &ProxySharedState, conn_id: u64) { - let registry = &shared.middle_relay.relay_idle_registry; - let mut guard = match registry.lock() { - Ok(guard) => guard, - Err(poisoned) => { - let mut guard = poisoned.into_inner(); - *guard = RelayIdleCandidateRegistry::default(); - registry.clear_poison(); - guard - } - }; - if let Some(meta) = guard.by_conn_id.remove(&conn_id) { - guard.ordered.remove(&(meta.mark_order_seq, conn_id)); - } + clear_relay_idle_candidate_in(shared, conn_id); } #[cfg(test)] pub(crate) fn clear_relay_idle_pressure_state_for_testing_in_shared(shared: &ProxySharedState) { - if let Ok(mut guard) = shared.middle_relay.relay_idle_registry.lock() { - *guard = RelayIdleCandidateRegistry::default(); - } + let registry = &shared.middle_relay.relay_idle_registry; + registry.by_conn_id.clear(); + registry.ordered.lock().clear(); + registry.pressure_event_seq.store(0, Ordering::Relaxed); + registry.pressure_consumed_seq.store(0, Ordering::Relaxed); shared .middle_relay .relay_idle_mark_seq @@ -327,15 +302,10 @@ pub(crate) fn set_relay_pressure_state_for_testing( pressure_consumed_seq: u64, ) { let registry = &shared.middle_relay.relay_idle_registry; - let mut guard = match registry.lock() { - Ok(guard) => guard, - Err(poisoned) => { - let mut guard = poisoned.into_inner(); - *guard = RelayIdleCandidateRegistry::default(); - registry.clear_poison(); - guard - } - }; - guard.pressure_event_seq = pressure_event_seq; - guard.pressure_consumed_seq = pressure_consumed_seq; + registry + .pressure_event_seq + .store(pressure_event_seq, Ordering::Relaxed); + registry + .pressure_consumed_seq + .store(pressure_consumed_seq, Ordering::Relaxed); } diff --git a/src/proxy/shared_state.rs b/src/proxy/shared_state.rs index e204890..11e390e 100644 --- a/src/proxy/shared_state.rs +++ b/src/proxy/shared_state.rs @@ -59,7 +59,7 @@ pub(crate) struct MiddleRelaySharedState { pub(crate) desync_hasher: RandomState, pub(crate) desync_full_cache_last_emit_at: Mutex>, pub(crate) desync_dedup_rotation_state: Mutex, - pub(crate) relay_idle_registry: Mutex, + pub(crate) relay_idle_registry: RelayIdleCandidateRegistry, pub(crate) relay_idle_mark_seq: AtomicU64, } @@ -97,7 +97,7 @@ impl ProxySharedState { desync_hasher: RandomState::new(), desync_full_cache_last_emit_at: Mutex::new(None), desync_dedup_rotation_state: Mutex::new(DesyncDedupRotationState::default()), - relay_idle_registry: Mutex::new(RelayIdleCandidateRegistry::default()), + relay_idle_registry: RelayIdleCandidateRegistry::default(), relay_idle_mark_seq: AtomicU64::new(0), }, traffic_limiter: TrafficLimiter::new(), diff --git a/src/proxy/tests/middle_relay_idle_registry_poison_security_tests.rs b/src/proxy/tests/middle_relay_idle_registry_poison_security_tests.rs index 4f57f56..107bb04 100644 --- a/src/proxy/tests/middle_relay_idle_registry_poison_security_tests.rs +++ b/src/proxy/tests/middle_relay_idle_registry_poison_security_tests.rs @@ -1,33 +1,21 @@ use super::*; -use std::panic::{AssertUnwindSafe, catch_unwind}; #[test] -fn blackhat_registry_poison_recovers_with_fail_closed_reset_and_pressure_accounting() { +fn blackhat_registry_stale_order_entry_is_skipped_and_pressure_accounting_continues() { let shared = ProxySharedState::new(); clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref()); - let _ = catch_unwind(AssertUnwindSafe(|| { - let mut guard = shared - .middle_relay - .relay_idle_registry - .lock() - .expect("registry lock must be acquired before poison"); - guard.by_conn_id.insert( - 999, - RelayIdleCandidateMeta { - mark_order_seq: 1, - mark_pressure_seq: 0, - }, - ); - guard.ordered.insert((1, 999)); - panic!("intentional poison for idle-registry recovery"); - })); + shared + .middle_relay + .relay_idle_registry + .ordered + .lock() + .insert((0, 999)); - // Helper lock must recover from poison, reset stale state, and continue. assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 42)); assert_eq!( oldest_relay_idle_candidate_for_testing(shared.as_ref()), - Some(42) + Some(999) ); let before = relay_pressure_event_seq_for_testing(shared.as_ref()); @@ -35,25 +23,43 @@ fn blackhat_registry_poison_recovers_with_fail_closed_reset_and_pressure_account let after = relay_pressure_event_seq_for_testing(shared.as_ref()); assert!( after > before, - "pressure accounting must still advance after poison" + "pressure accounting must still advance with stale ordered entries" + ); + + let mut seen_pressure_seq = before; + assert!(maybe_evict_idle_candidate_on_pressure_for_testing( + shared.as_ref(), + 42, + &mut seen_pressure_seq, + &Stats::new() + )); + assert_eq!( + oldest_relay_idle_candidate_for_testing(shared.as_ref()), + None ); clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref()); } #[test] -fn clear_state_helper_must_reset_poisoned_registry_for_deterministic_fifo_tests() { +fn clear_state_helper_must_reset_split_registry_for_deterministic_fifo_tests() { let shared = ProxySharedState::new(); clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref()); - let _ = catch_unwind(AssertUnwindSafe(|| { - let _guard = shared - .middle_relay - .relay_idle_registry - .lock() - .expect("registry lock must be acquired before poison"); - panic!("intentional poison while lock held"); - })); + shared.middle_relay.relay_idle_registry.by_conn_id.insert( + 999, + RelayIdleCandidateMeta { + mark_order_seq: 1, + mark_pressure_seq: 0, + }, + ); + shared + .middle_relay + .relay_idle_registry + .ordered + .lock() + .insert((1, 999)); + set_relay_pressure_state_for_testing(shared.as_ref(), 7, 6); clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref()); diff --git a/src/transport/middle_proxy/registry.rs b/src/transport/middle_proxy/registry.rs index 0f3925e..71f75f8 100644 --- a/src/transport/middle_proxy/registry.rs +++ b/src/transport/middle_proxy/registry.rs @@ -77,26 +77,24 @@ struct HotBindingTable { struct BindingState { inner: Mutex, + writer_idle_since_epoch_secs: DashMap, + bound_clients_by_writer: DashMap, + active_sessions_by_target_dc: DashMap, + last_meta_for_writer: DashMap, } struct BindingInner { - writers: HashMap>, writer_for_conn: HashMap, conns_for_writer: HashMap>, meta: HashMap, - last_meta_for_writer: HashMap, - writer_idle_since_epoch_secs: HashMap, } impl BindingInner { fn new() -> Self { Self { - writers: HashMap::new(), writer_for_conn: HashMap::new(), conns_for_writer: HashMap::new(), meta: HashMap::new(), - last_meta_for_writer: HashMap::new(), - writer_idle_since_epoch_secs: HashMap::new(), } } } @@ -149,6 +147,10 @@ impl ConnRegistry { }, binding: BindingState { inner: Mutex::new(BindingInner::new()), + writer_idle_since_epoch_secs: DashMap::new(), + bound_clients_by_writer: DashMap::new(), + active_sessions_by_target_dc: DashMap::new(), + last_meta_for_writer: DashMap::new(), }, next_id: AtomicU64::new(start), route_channel_capacity, diff --git a/src/transport/middle_proxy/registry/writer.rs b/src/transport/middle_proxy/registry/writer.rs index c2817f0..4cb0d43 100644 --- a/src/transport/middle_proxy/registry/writer.rs +++ b/src/transport/middle_proxy/registry/writer.rs @@ -13,13 +13,55 @@ use super::{ }; impl ConnRegistry { + fn set_writer_bound_count(&self, writer_id: u64, count: usize) { + self.binding.bound_clients_by_writer.insert(writer_id, count); + if count == 0 { + self.binding + .writer_idle_since_epoch_secs + .entry(writer_id) + .or_insert_with(Self::now_epoch_secs); + } else { + self.binding.writer_idle_since_epoch_secs.remove(&writer_id); + } + } + + fn adjust_active_target_dc(&self, target_dc: i16, delta: isize) { + if target_dc == 0 || delta == 0 { + return; + } + if delta > 0 { + self.binding + .active_sessions_by_target_dc + .entry(target_dc) + .and_modify(|count| *count = count.saturating_add(delta as usize)) + .or_insert(delta as usize); + return; + } + + let remove = + if let Some(mut count) = self.binding.active_sessions_by_target_dc.get_mut(&target_dc) { + let decrement = delta.unsigned_abs(); + *count = count.saturating_sub(decrement); + *count == 0 + } else { + false + }; + if remove { + self.binding.active_sessions_by_target_dc.remove(&target_dc); + } + } + pub async fn register_writer(&self, writer_id: u64, tx: mpsc::Sender) { let mut binding = self.binding.inner.lock().await; - binding.writers.insert(writer_id, tx.clone()); binding .conns_for_writer .entry(writer_id) .or_insert_with(HashSet::new); + self.binding.bound_clients_by_writer.entry(writer_id).or_insert(0); + self.binding + .writer_idle_since_epoch_secs + .entry(writer_id) + .or_insert_with(Self::now_epoch_secs); self.writers.map.insert(writer_id, tx); } @@ -29,19 +71,18 @@ impl ConnRegistry { self.routing.byte_budget.remove(&id); self.hot_binding.map.remove(&id); let mut binding = self.binding.inner.lock().await; - binding.meta.remove(&id); + let previous_meta = binding.meta.remove(&id); + if let Some(meta) = previous_meta.as_ref() { + self.adjust_active_target_dc(meta.target_dc, -1); + } if let Some(writer_id) = binding.writer_for_conn.remove(&id) { - let became_empty = if let Some(set) = binding.conns_for_writer.get_mut(&writer_id) { + let next_count = if let Some(set) = binding.conns_for_writer.get_mut(&writer_id) { set.remove(&id); - set.is_empty() + set.len() } else { - false + 0 }; - if became_empty { - binding - .writer_idle_since_epoch_secs - .insert(writer_id, Self::now_epoch_secs()); - } + self.set_writer_bound_count(writer_id, next_count); return Some(writer_id); } None @@ -248,7 +289,7 @@ impl ConnRegistry { if !self.routing.map.contains_key(&conn_id) { return false; } - if !binding.writers.contains_key(&writer_id) { + if !self.writers.map.contains_key(&writer_id) { return false; } @@ -256,28 +297,32 @@ impl ConnRegistry { if let Some(previous_writer_id) = previous_writer_id && previous_writer_id != writer_id { - let became_empty = + let next_count = if let Some(set) = binding.conns_for_writer.get_mut(&previous_writer_id) { set.remove(&conn_id); - set.is_empty() + set.len() } else { - false + 0 }; - if became_empty { - binding - .writer_idle_since_epoch_secs - .insert(previous_writer_id, Self::now_epoch_secs()); - } + self.set_writer_bound_count(previous_writer_id, next_count); } - binding.meta.insert(conn_id, meta.clone()); - binding.last_meta_for_writer.insert(writer_id, meta.clone()); - binding.writer_idle_since_epoch_secs.remove(&writer_id); - binding - .conns_for_writer - .entry(writer_id) - .or_insert_with(HashSet::new) - .insert(conn_id); + if let Some(previous_meta) = binding.meta.insert(conn_id, meta.clone()) { + self.adjust_active_target_dc(previous_meta.target_dc, -1); + } + self.adjust_active_target_dc(meta.target_dc, 1); + self.binding + .last_meta_for_writer + .insert(writer_id, meta.clone()); + let next_count = { + let set = binding + .conns_for_writer + .entry(writer_id) + .or_insert_with(HashSet::new); + set.insert(conn_id); + set.len() + }; + self.set_writer_bound_count(writer_id, next_count); self.hot_binding .map .insert(conn_id, HotConnBinding { writer_id, meta }); @@ -290,27 +335,38 @@ impl ConnRegistry { .conns_for_writer .entry(writer_id) .or_insert_with(HashSet::new); - binding - .writer_idle_since_epoch_secs - .entry(writer_id) - .or_insert(Self::now_epoch_secs()); + let count = binding + .conns_for_writer + .get(&writer_id) + .map(|set| set.len()) + .unwrap_or(0); + self.set_writer_bound_count(writer_id, count); } pub async fn get_last_writer_meta(&self, writer_id: u64) -> Option { - let binding = self.binding.inner.lock().await; - binding.last_meta_for_writer.get(&writer_id).cloned() + self.binding + .last_meta_for_writer + .get(&writer_id) + .map(|entry| entry.value().clone()) } pub async fn writer_idle_since_snapshot(&self) -> HashMap { - let binding = self.binding.inner.lock().await; - binding.writer_idle_since_epoch_secs.clone() + self.binding + .writer_idle_since_epoch_secs + .iter() + .map(|entry| (*entry.key(), *entry.value())) + .collect() } pub async fn writer_idle_since_for_writer_ids(&self, writer_ids: &[u64]) -> HashMap { - let binding = self.binding.inner.lock().await; let mut out = HashMap::::with_capacity(writer_ids.len()); for writer_id in writer_ids { - if let Some(idle_since) = binding.writer_idle_since_epoch_secs.get(writer_id).copied() { + if let Some(idle_since) = self + .binding + .writer_idle_since_epoch_secs + .get(writer_id) + .map(|entry| *entry.value()) + { out.insert(*writer_id, idle_since); } } @@ -320,25 +376,19 @@ impl ConnRegistry { pub(in crate::transport::middle_proxy) async fn writer_activity_snapshot( &self, ) -> WriterActivitySnapshot { - let binding = self.binding.inner.lock().await; - let mut bound_clients_by_writer = HashMap::::new(); - let mut active_sessions_by_target_dc = HashMap::::new(); - - for (writer_id, conn_ids) in &binding.conns_for_writer { - bound_clients_by_writer.insert(*writer_id, conn_ids.len()); - } - for conn_meta in binding.meta.values() { - if conn_meta.target_dc == 0 { - continue; - } - *active_sessions_by_target_dc - .entry(conn_meta.target_dc) - .or_insert(0) += 1; - } - WriterActivitySnapshot { - bound_clients_by_writer, - active_sessions_by_target_dc, + bound_clients_by_writer: self + .binding + .bound_clients_by_writer + .iter() + .map(|entry| (*entry.key(), *entry.value())) + .collect(), + active_sessions_by_target_dc: self + .binding + .active_sessions_by_target_dc + .iter() + .map(|entry| (*entry.key(), *entry.value())) + .collect(), } } @@ -393,10 +443,10 @@ impl ConnRegistry { pub async fn writer_lost(&self, writer_id: u64) -> Vec { let mut binding = self.binding.inner.lock().await; - binding.writers.remove(&writer_id); self.writers.map.remove(&writer_id); - binding.last_meta_for_writer.remove(&writer_id); - binding.writer_idle_since_epoch_secs.remove(&writer_id); + self.binding.last_meta_for_writer.remove(&writer_id); + self.binding.writer_idle_since_epoch_secs.remove(&writer_id); + self.binding.bound_clients_by_writer.remove(&writer_id); let conns = binding .conns_for_writer .remove(&writer_id) @@ -410,6 +460,10 @@ impl ConnRegistry { continue; } binding.writer_for_conn.remove(&conn_id); + let meta = binding.meta.remove(&conn_id); + if let Some(meta) = meta.as_ref() { + self.adjust_active_target_dc(meta.target_dc, -1); + } let remove_hot = self .hot_binding .map @@ -419,10 +473,10 @@ impl ConnRegistry { if remove_hot { self.hot_binding.map.remove(&conn_id); } - if let Some(m) = binding.meta.get(&conn_id) { + if let Some(m) = meta { out.push(BoundConn { conn_id, - meta: m.clone(), + meta: m, }); } } @@ -438,11 +492,10 @@ impl ConnRegistry { } pub async fn is_writer_empty(&self, writer_id: u64) -> bool { - let binding = self.binding.inner.lock().await; - binding - .conns_for_writer + self.binding + .bound_clients_by_writer .get(&writer_id) - .map(|s| s.is_empty()) + .map(|count| *count.value() == 0) .unwrap_or(true) } @@ -457,21 +510,20 @@ impl ConnRegistry { return false; } - binding.writers.remove(&writer_id); self.writers.map.remove(&writer_id); - binding.last_meta_for_writer.remove(&writer_id); - binding.writer_idle_since_epoch_secs.remove(&writer_id); + self.binding.last_meta_for_writer.remove(&writer_id); + self.binding.writer_idle_since_epoch_secs.remove(&writer_id); + self.binding.bound_clients_by_writer.remove(&writer_id); binding.conns_for_writer.remove(&writer_id); true } #[allow(dead_code)] pub(super) async fn non_empty_writer_ids(&self, writer_ids: &[u64]) -> HashSet { - let binding = self.binding.inner.lock().await; let mut out = HashSet::::with_capacity(writer_ids.len()); for writer_id in writer_ids { - if let Some(conns) = binding.conns_for_writer.get(writer_id) - && !conns.is_empty() + if let Some(count) = self.binding.bound_clients_by_writer.get(writer_id) + && *count.value() > 0 { out.insert(*writer_id); } diff --git a/src/transport/middle_proxy/send.rs b/src/transport/middle_proxy/send.rs index 847d60e..4eefcf0 100644 --- a/src/transport/middle_proxy/send.rs +++ b/src/transport/middle_proxy/send.rs @@ -15,7 +15,6 @@ use super::registry::ConnMeta; use super::wire::build_proxy_req_payload; use crate::config::{MeRouteNoWriterMode, MeWriterPickMode}; use crate::error::{ProxyError, Result}; -use crate::network::IpFamily; use crate::stream::PooledBuffer; use rand::seq::SliceRandom; @@ -124,9 +123,8 @@ impl MePool { } let mut writers_snapshot = { - let ws = self.writers.read().await; + let ws = self.writers.snapshot(); if ws.is_empty() { - drop(ws); match no_writer_mode { MeRouteNoWriterMode::AsyncRecoveryFailfast => { let deadline = *no_writer_deadline.get_or_insert_with(|| { @@ -154,38 +152,32 @@ impl MePool { for _ in 0..self.route_runtime.me_route_inline_recovery_attempts.max(1) { - for family in self.family_order() { - let map = match family { - IpFamily::V4 => self.proxy_map_v4.read().await.clone(), - IpFamily::V6 => self.proxy_map_v6.read().await.clone(), - }; - for (dc, addrs) in &map { - for (ip, port) in addrs { - let addr = SocketAddr::new(*ip, *port); - let _ = self - .connect_one_for_dc( - addr, - *dc, - self.rng.as_ref(), - ) - .await; - } + let preferred = self.preferred_endpoints_by_dc.load_full(); + for (dc, addrs) in preferred.iter() { + for addr in addrs { + let _ = self + .connect_one_for_dc( + *addr, + *dc, + self.rng.as_ref(), + ) + .await; } } - if !self.writers.read().await.is_empty() { + if !self.writers.snapshot().is_empty() { break; } } } - if !self.writers.read().await.is_empty() { + if !self.writers.snapshot().is_empty() { continue; } let deadline = *no_writer_deadline.get_or_insert_with(|| { Instant::now() + self.route_runtime.me_route_inline_recovery_wait }); if !self.wait_for_writer_until(deadline).await { - if !self.writers.read().await.is_empty() { + if !self.writers.snapshot().is_empty() { continue; } self.stats.increment_me_no_writer_failfast_total(); @@ -222,7 +214,7 @@ impl MePool { } } } - ws.clone() + ws }; let mut candidate_indices = self @@ -285,7 +277,12 @@ impl MePool { )); } emergency_attempts += 1; - let mut endpoints = self.endpoint_candidates_for_target_dc(routed_dc).await; + let mut endpoints = self + .preferred_endpoints_by_dc + .load() + .get(&routed_dc) + .cloned() + .unwrap_or_default(); endpoints.shuffle(&mut rand::rng()); for addr in endpoints { if self @@ -298,9 +295,7 @@ impl MePool { } tokio::time::sleep(Duration::from_millis(100 * emergency_attempts as u64)) .await; - let ws2 = self.writers.read().await; - writers_snapshot = ws2.clone(); - drop(ws2); + writers_snapshot = self.writers.snapshot(); candidate_indices = self .candidate_indices_for_dc(&writers_snapshot, routed_dc, false) .await; diff --git a/src/transport/middle_proxy/send/recovery.rs b/src/transport/middle_proxy/send/recovery.rs index 85772da..ab38b5e 100644 --- a/src/transport/middle_proxy/send/recovery.rs +++ b/src/transport/middle_proxy/send/recovery.rs @@ -1,13 +1,9 @@ -use std::collections::HashSet; -use std::net::SocketAddr; use std::sync::Arc; use std::sync::atomic::Ordering; use std::time::{Duration, Instant}; use tracing::warn; -use crate::network::IpFamily; - use super::super::MePool; use super::{ HYBRID_GLOBAL_BURST_PERIOD_ROUNDS, HYBRID_RECENT_SUCCESS_WINDOW_MS, @@ -17,18 +13,18 @@ use super::{ impl MePool { pub(super) async fn wait_for_writer_until(&self, deadline: Instant) -> bool { let mut rx = self.writer_epoch.subscribe(); - if !self.writers.read().await.is_empty() { + if !self.writers.snapshot().is_empty() { return true; } let now = Instant::now(); if now >= deadline { - return !self.writers.read().await.is_empty(); + return !self.writers.snapshot().is_empty(); } let timeout = deadline.saturating_duration_since(now); if tokio::time::timeout(timeout, rx.changed()).await.is_ok() { - return !self.writers.read().await.is_empty(); + return !self.writers.snapshot().is_empty(); } - !self.writers.read().await.is_empty() + !self.writers.snapshot().is_empty() } pub(super) async fn wait_for_candidate_until(&self, routed_dc: i32, deadline: Instant) -> bool { @@ -58,11 +54,11 @@ impl MePool { pub(super) async fn has_candidate_for_target_dc(&self, routed_dc: i32) -> bool { let writers_snapshot = { - let ws = self.writers.read().await; + let ws = self.writers.snapshot(); if ws.is_empty() { return false; } - ws.clone() + ws }; let mut candidate_indices = self .candidate_indices_for_dc(&writers_snapshot, routed_dc, false) @@ -79,7 +75,7 @@ impl MePool { self: &Arc, routed_dc: i32, ) -> bool { - let endpoints = self.endpoint_candidates_for_target_dc(routed_dc).await; + let endpoints = self.preferred_endpoints_for_dc(routed_dc).await; if endpoints.is_empty() { return false; } @@ -92,33 +88,19 @@ impl MePool { pub(super) async fn trigger_async_recovery_global(self: &Arc) { self.stats.increment_me_async_recovery_trigger_total(); - let mut seen = HashSet::<(i32, SocketAddr)>::new(); - for family in self.family_order() { - let map_guard = match family { - IpFamily::V4 => self.proxy_map_v4.read().await, - IpFamily::V6 => self.proxy_map_v6.read().await, - }; - for (dc, addrs) in map_guard.iter() { - for (ip, port) in addrs { - let addr = SocketAddr::new(*ip, *port); - if seen.insert((*dc, addr)) { - self.trigger_immediate_refill_for_dc(addr, *dc); - } - if seen.len() >= 8 { - return; - } + let preferred = self.preferred_endpoints_by_dc.load(); + let mut triggered = 0usize; + for (dc, addrs) in preferred.iter() { + for addr in addrs { + self.trigger_immediate_refill_for_dc(*addr, *dc); + triggered = triggered.saturating_add(1); + if triggered >= 8 { + return; } } } } - pub(super) async fn endpoint_candidates_for_target_dc( - &self, - routed_dc: i32, - ) -> Vec { - self.preferred_endpoints_for_dc(routed_dc).await - } - pub(super) async fn maybe_trigger_hybrid_recovery( self: &Arc, routed_dc: i32, diff --git a/src/transport/middle_proxy/send/selection.rs b/src/transport/middle_proxy/send/selection.rs index ac05fa1..834e0c0 100644 --- a/src/transport/middle_proxy/send/selection.rs +++ b/src/transport/middle_proxy/send/selection.rs @@ -15,7 +15,10 @@ impl MePool { routed_dc: i32, include_warm: bool, ) -> Vec { - let preferred = self.preferred_endpoints_for_dc(routed_dc).await; + let preferred_snapshot = self.preferred_endpoints_by_dc.load(); + let Some(preferred) = preferred_snapshot.get(&routed_dc) else { + return Vec::new(); + }; if preferred.is_empty() { return Vec::new(); } @@ -25,7 +28,7 @@ impl MePool { if !self.writer_eligible_for_selection(w, include_warm) { continue; } - if w.writer_dc == routed_dc && preferred.contains(&w.addr) { + if w.writer_dc == routed_dc && preferred.binary_search(&w.addr).is_ok() { out.push(idx); } }