Compare commits

..

6 Commits

Author SHA1 Message Date
Alexey
dc8951eae8 Reduce MR + ME Routing hot-path contention 2026-05-22 20:19:09 +03:00
Alexey
77a7f89075 Reuse ME reader scratch buffer across read loop iterations 2026-05-22 19:56:38 +03:00
Alexey
9abaf9006c Prioritize Cancellation in MP select paths 2026-05-22 16:47:54 +03:00
Alexey
231f04a810 Bump 2026-05-22 11:00:41 +03:00
Alexey
b32daf79bc Update Cargo.lock 2026-05-22 11:00:18 +03:00
Alexey
f668759c05 API Fixes + Exclusive Mask + Startup Speed-up + IDN + Decomposing hot-path modules: merge pull request #796 from telemt/flow
API Fixes + Exclusive Mask + Startup Speed-up + IDN + Decomposing hot-path modules
2026-05-22 10:55:26 +03:00
13 changed files with 308 additions and 290 deletions

2
Cargo.lock generated
View File

@@ -2791,7 +2791,7 @@ checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
[[package]] [[package]]
name = "telemt" name = "telemt"
version = "3.4.11" version = "3.4.12"
dependencies = [ dependencies = [
"aes", "aes",
"anyhow", "anyhow",

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "telemt" name = "telemt"
version = "3.4.11" version = "3.4.12"
edition = "2024" edition = "2024"
[features] [features]

View File

@@ -1,4 +1,5 @@
use super::*; use super::*;
use dashmap::DashMap;
mod read; mod read;
@@ -10,10 +11,10 @@ pub(crate) use self::read::{
#[derive(Default)] #[derive(Default)]
pub(crate) struct RelayIdleCandidateRegistry { pub(crate) struct RelayIdleCandidateRegistry {
pub(in crate::proxy::middle_relay) by_conn_id: HashMap<u64, RelayIdleCandidateMeta>, pub(in crate::proxy::middle_relay) by_conn_id: DashMap<u64, RelayIdleCandidateMeta>,
pub(in crate::proxy::middle_relay) ordered: BTreeSet<(u64, u64)>, pub(in crate::proxy::middle_relay) ordered: parking_lot::Mutex<BTreeSet<(u64, u64)>>,
pressure_event_seq: u64, pressure_event_seq: AtomicU64,
pressure_consumed_seq: u64, pressure_consumed_seq: AtomicU64,
} }
/// Queue metadata used to preserve FIFO ordering for idle relay eviction. /// Queue metadata used to preserve FIFO ordering for idle relay eviction.
@@ -23,25 +24,10 @@ pub(in crate::proxy::middle_relay) struct RelayIdleCandidateMeta {
pub(in crate::proxy::middle_relay) mark_pressure_seq: u64, pub(in crate::proxy::middle_relay) mark_pressure_seq: u64,
} }
pub(super) fn relay_idle_candidate_registry_lock_in(
shared: &ProxySharedState,
) -> std::sync::MutexGuard<'_, RelayIdleCandidateRegistry> {
let registry = &shared.middle_relay.relay_idle_registry;
match registry.lock() {
Ok(guard) => guard,
Err(poisoned) => {
let mut guard = poisoned.into_inner();
*guard = RelayIdleCandidateRegistry::default();
registry.clear_poison();
guard
}
}
}
pub(super) fn mark_relay_idle_candidate_in(shared: &ProxySharedState, conn_id: u64) -> bool { pub(super) fn mark_relay_idle_candidate_in(shared: &ProxySharedState, conn_id: u64) -> bool {
let mut guard = relay_idle_candidate_registry_lock_in(shared); let registry = &shared.middle_relay.relay_idle_registry;
if guard.by_conn_id.contains_key(&conn_id) { if registry.by_conn_id.contains_key(&conn_id) {
return false; return false;
} }
@@ -52,24 +38,35 @@ pub(super) fn mark_relay_idle_candidate_in(shared: &ProxySharedState, conn_id: u
.saturating_add(1); .saturating_add(1);
let meta = RelayIdleCandidateMeta { let meta = RelayIdleCandidateMeta {
mark_order_seq, mark_order_seq,
mark_pressure_seq: guard.pressure_event_seq, mark_pressure_seq: registry.pressure_event_seq.load(Ordering::Relaxed),
}; };
guard.by_conn_id.insert(conn_id, meta); match registry.by_conn_id.entry(conn_id) {
guard.ordered.insert((meta.mark_order_seq, conn_id)); dashmap::mapref::entry::Entry::Occupied(_) => false,
dashmap::mapref::entry::Entry::Vacant(entry) => {
entry.insert(meta);
registry.ordered.lock().insert((meta.mark_order_seq, conn_id));
true true
}
}
} }
pub(super) fn clear_relay_idle_candidate_in(shared: &ProxySharedState, conn_id: u64) { pub(super) fn clear_relay_idle_candidate_in(shared: &ProxySharedState, conn_id: u64) {
let mut guard = relay_idle_candidate_registry_lock_in(shared); let registry = &shared.middle_relay.relay_idle_registry;
if let Some(meta) = guard.by_conn_id.remove(&conn_id) { if let Some((_, meta)) = registry.by_conn_id.remove(&conn_id) {
guard.ordered.remove(&(meta.mark_order_seq, conn_id)); registry
.ordered
.lock()
.remove(&(meta.mark_order_seq, conn_id));
} }
} }
pub(super) fn note_relay_pressure_event_in(shared: &ProxySharedState) { pub(super) fn note_relay_pressure_event_in(shared: &ProxySharedState) {
let mut guard = relay_idle_candidate_registry_lock_in(shared); shared
guard.pressure_event_seq = guard.pressure_event_seq.wrapping_add(1); .middle_relay
.relay_idle_registry
.pressure_event_seq
.fetch_add(1, Ordering::Relaxed);
} }
pub(crate) fn note_global_relay_pressure(shared: &ProxySharedState) { pub(crate) fn note_global_relay_pressure(shared: &ProxySharedState) {
@@ -77,8 +74,11 @@ pub(crate) fn note_global_relay_pressure(shared: &ProxySharedState) {
} }
pub(super) fn relay_pressure_event_seq_in(shared: &ProxySharedState) -> u64 { pub(super) fn relay_pressure_event_seq_in(shared: &ProxySharedState) -> u64 {
let guard = relay_idle_candidate_registry_lock_in(shared); shared
guard.pressure_event_seq .middle_relay
.relay_idle_registry
.pressure_event_seq
.load(Ordering::Relaxed)
} }
pub(super) fn maybe_evict_idle_candidate_on_pressure_in( pub(super) fn maybe_evict_idle_candidate_on_pressure_in(
@@ -87,33 +87,43 @@ pub(super) fn maybe_evict_idle_candidate_on_pressure_in(
seen_pressure_seq: &mut u64, seen_pressure_seq: &mut u64,
stats: &Stats, stats: &Stats,
) -> bool { ) -> bool {
let mut guard = relay_idle_candidate_registry_lock_in(shared); let registry = &shared.middle_relay.relay_idle_registry;
let latest_pressure_seq = guard.pressure_event_seq; let latest_pressure_seq = registry.pressure_event_seq.load(Ordering::Relaxed);
if latest_pressure_seq == *seen_pressure_seq { if latest_pressure_seq == *seen_pressure_seq {
return false; return false;
} }
*seen_pressure_seq = latest_pressure_seq; *seen_pressure_seq = latest_pressure_seq;
if latest_pressure_seq == guard.pressure_consumed_seq { if latest_pressure_seq == registry.pressure_consumed_seq.load(Ordering::Relaxed) {
return false; return false;
} }
if guard.ordered.is_empty() { let oldest = {
guard.pressure_consumed_seq = latest_pressure_seq; let mut ordered = registry.ordered.lock();
loop {
let Some((mark_order_seq, candidate_conn_id)) = ordered.iter().next().copied() else {
registry
.pressure_consumed_seq
.store(latest_pressure_seq, Ordering::Relaxed);
return false; return false;
};
let Some(candidate_meta) = registry.by_conn_id.get(&candidate_conn_id) else {
ordered.remove(&(mark_order_seq, candidate_conn_id));
continue;
};
if candidate_meta.mark_order_seq != mark_order_seq {
ordered.remove(&(mark_order_seq, candidate_conn_id));
continue;
} }
break Some(candidate_conn_id);
let oldest = guard }
.ordered };
.iter()
.next()
.map(|(_, candidate_conn_id)| *candidate_conn_id);
if oldest != Some(conn_id) { if oldest != Some(conn_id) {
return false; return false;
} }
let Some(candidate_meta) = guard.by_conn_id.get(&conn_id).copied() else { let Some(candidate_meta) = registry.by_conn_id.get(&conn_id).map(|entry| *entry.value()) else {
return false; return false;
}; };
@@ -121,10 +131,15 @@ pub(super) fn maybe_evict_idle_candidate_on_pressure_in(
return false; return false;
} }
if let Some(meta) = guard.by_conn_id.remove(&conn_id) { if let Some((_, meta)) = registry.by_conn_id.remove(&conn_id) {
guard.ordered.remove(&(meta.mark_order_seq, conn_id)); registry
.ordered
.lock()
.remove(&(meta.mark_order_seq, conn_id));
} }
guard.pressure_consumed_seq = latest_pressure_seq; registry
.pressure_consumed_seq
.store(latest_pressure_seq, Ordering::Relaxed);
stats.increment_relay_pressure_evict_total(); stats.increment_relay_pressure_evict_total();
true true
} }
@@ -220,72 +235,32 @@ pub(crate) fn mark_relay_idle_candidate_for_testing(
shared: &ProxySharedState, shared: &ProxySharedState,
conn_id: u64, conn_id: u64,
) -> bool { ) -> bool {
let registry = &shared.middle_relay.relay_idle_registry; mark_relay_idle_candidate_in(shared, conn_id)
let mut guard = match registry.lock() {
Ok(guard) => guard,
Err(poisoned) => {
let mut guard = poisoned.into_inner();
*guard = RelayIdleCandidateRegistry::default();
registry.clear_poison();
guard
}
};
if guard.by_conn_id.contains_key(&conn_id) {
return false;
}
let mark_order_seq = shared
.middle_relay
.relay_idle_mark_seq
.fetch_add(1, Ordering::Relaxed);
let mark_pressure_seq = guard.pressure_event_seq;
let meta = RelayIdleCandidateMeta {
mark_order_seq,
mark_pressure_seq,
};
guard.by_conn_id.insert(conn_id, meta);
guard.ordered.insert((mark_order_seq, conn_id));
true
} }
#[cfg(test)] #[cfg(test)]
pub(crate) fn oldest_relay_idle_candidate_for_testing(shared: &ProxySharedState) -> Option<u64> { pub(crate) fn oldest_relay_idle_candidate_for_testing(shared: &ProxySharedState) -> Option<u64> {
let registry = &shared.middle_relay.relay_idle_registry; let registry = &shared.middle_relay.relay_idle_registry;
let guard = match registry.lock() { registry
Ok(guard) => guard, .ordered
Err(poisoned) => { .lock()
let mut guard = poisoned.into_inner(); .iter()
*guard = RelayIdleCandidateRegistry::default(); .next()
registry.clear_poison(); .map(|(_, conn_id)| *conn_id)
guard
}
};
guard.ordered.iter().next().map(|(_, conn_id)| *conn_id)
} }
#[cfg(test)] #[cfg(test)]
pub(crate) fn clear_relay_idle_candidate_for_testing(shared: &ProxySharedState, conn_id: u64) { pub(crate) fn clear_relay_idle_candidate_for_testing(shared: &ProxySharedState, conn_id: u64) {
let registry = &shared.middle_relay.relay_idle_registry; clear_relay_idle_candidate_in(shared, conn_id);
let mut guard = match registry.lock() {
Ok(guard) => guard,
Err(poisoned) => {
let mut guard = poisoned.into_inner();
*guard = RelayIdleCandidateRegistry::default();
registry.clear_poison();
guard
}
};
if let Some(meta) = guard.by_conn_id.remove(&conn_id) {
guard.ordered.remove(&(meta.mark_order_seq, conn_id));
}
} }
#[cfg(test)] #[cfg(test)]
pub(crate) fn clear_relay_idle_pressure_state_for_testing_in_shared(shared: &ProxySharedState) { pub(crate) fn clear_relay_idle_pressure_state_for_testing_in_shared(shared: &ProxySharedState) {
if let Ok(mut guard) = shared.middle_relay.relay_idle_registry.lock() { let registry = &shared.middle_relay.relay_idle_registry;
*guard = RelayIdleCandidateRegistry::default(); registry.by_conn_id.clear();
} registry.ordered.lock().clear();
registry.pressure_event_seq.store(0, Ordering::Relaxed);
registry.pressure_consumed_seq.store(0, Ordering::Relaxed);
shared shared
.middle_relay .middle_relay
.relay_idle_mark_seq .relay_idle_mark_seq
@@ -327,15 +302,10 @@ pub(crate) fn set_relay_pressure_state_for_testing(
pressure_consumed_seq: u64, pressure_consumed_seq: u64,
) { ) {
let registry = &shared.middle_relay.relay_idle_registry; let registry = &shared.middle_relay.relay_idle_registry;
let mut guard = match registry.lock() { registry
Ok(guard) => guard, .pressure_event_seq
Err(poisoned) => { .store(pressure_event_seq, Ordering::Relaxed);
let mut guard = poisoned.into_inner(); registry
*guard = RelayIdleCandidateRegistry::default(); .pressure_consumed_seq
registry.clear_poison(); .store(pressure_consumed_seq, Ordering::Relaxed);
guard
}
};
guard.pressure_event_seq = pressure_event_seq;
guard.pressure_consumed_seq = pressure_consumed_seq;
} }

View File

@@ -41,11 +41,12 @@ pub(super) async fn reserve_user_quota_with_yield(
return Err(MiddleQuotaReserveError::DeadlineExceeded); return Err(MiddleQuotaReserveError::DeadlineExceeded);
} }
tokio::select! { tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(backoff_ms)) => {} biased;
_ = cancel.cancelled() => { _ = cancel.cancelled() => {
stats.increment_quota_acquire_cancelled_total(); stats.increment_quota_acquire_cancelled_total();
return Err(MiddleQuotaReserveError::Cancelled); return Err(MiddleQuotaReserveError::Cancelled);
} }
_ = tokio::time::sleep(Duration::from_millis(backoff_ms)) => {}
} }
backoff_rounds = backoff_rounds.saturating_add(1); backoff_rounds = backoff_rounds.saturating_add(1);
if backoff_rounds >= QUOTA_RESERVE_MAX_BACKOFF_ROUNDS { if backoff_rounds >= QUOTA_RESERVE_MAX_BACKOFF_ROUNDS {
@@ -128,11 +129,12 @@ pub(super) async fn wait_for_traffic_budget_or_cancel(
return Err(ProxyError::TrafficBudgetWaitDeadlineExceeded); return Err(ProxyError::TrafficBudgetWaitDeadlineExceeded);
} }
tokio::select! { tokio::select! {
_ = tokio::time::sleep(next_refill_delay()) => {} biased;
_ = cancel.cancelled() => { _ = cancel.cancelled() => {
stats.increment_flow_wait_middle_rate_limit_cancelled_total(); stats.increment_flow_wait_middle_rate_limit_cancelled_total();
return Err(ProxyError::TrafficBudgetWaitCancelled); return Err(ProxyError::TrafficBudgetWaitCancelled);
} }
_ = tokio::time::sleep(next_refill_delay()) => {}
} }
let wait_ms = wait_started_at let wait_ms = wait_started_at
.elapsed() .elapsed()

View File

@@ -59,7 +59,7 @@ pub(crate) struct MiddleRelaySharedState {
pub(crate) desync_hasher: RandomState, pub(crate) desync_hasher: RandomState,
pub(crate) desync_full_cache_last_emit_at: Mutex<Option<Instant>>, pub(crate) desync_full_cache_last_emit_at: Mutex<Option<Instant>>,
pub(crate) desync_dedup_rotation_state: Mutex<DesyncDedupRotationState>, pub(crate) desync_dedup_rotation_state: Mutex<DesyncDedupRotationState>,
pub(crate) relay_idle_registry: Mutex<RelayIdleCandidateRegistry>, pub(crate) relay_idle_registry: RelayIdleCandidateRegistry,
pub(crate) relay_idle_mark_seq: AtomicU64, pub(crate) relay_idle_mark_seq: AtomicU64,
} }
@@ -97,7 +97,7 @@ impl ProxySharedState {
desync_hasher: RandomState::new(), desync_hasher: RandomState::new(),
desync_full_cache_last_emit_at: Mutex::new(None), desync_full_cache_last_emit_at: Mutex::new(None),
desync_dedup_rotation_state: Mutex::new(DesyncDedupRotationState::default()), desync_dedup_rotation_state: Mutex::new(DesyncDedupRotationState::default()),
relay_idle_registry: Mutex::new(RelayIdleCandidateRegistry::default()), relay_idle_registry: RelayIdleCandidateRegistry::default(),
relay_idle_mark_seq: AtomicU64::new(0), relay_idle_mark_seq: AtomicU64::new(0),
}, },
traffic_limiter: TrafficLimiter::new(), traffic_limiter: TrafficLimiter::new(),

View File

@@ -1,33 +1,21 @@
use super::*; use super::*;
use std::panic::{AssertUnwindSafe, catch_unwind};
#[test] #[test]
fn blackhat_registry_poison_recovers_with_fail_closed_reset_and_pressure_accounting() { fn blackhat_registry_stale_order_entry_is_skipped_and_pressure_accounting_continues() {
let shared = ProxySharedState::new(); let shared = ProxySharedState::new();
clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref()); clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref());
let _ = catch_unwind(AssertUnwindSafe(|| { shared
let mut guard = shared
.middle_relay .middle_relay
.relay_idle_registry .relay_idle_registry
.ordered
.lock() .lock()
.expect("registry lock must be acquired before poison"); .insert((0, 999));
guard.by_conn_id.insert(
999,
RelayIdleCandidateMeta {
mark_order_seq: 1,
mark_pressure_seq: 0,
},
);
guard.ordered.insert((1, 999));
panic!("intentional poison for idle-registry recovery");
}));
// Helper lock must recover from poison, reset stale state, and continue.
assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 42)); assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 42));
assert_eq!( assert_eq!(
oldest_relay_idle_candidate_for_testing(shared.as_ref()), oldest_relay_idle_candidate_for_testing(shared.as_ref()),
Some(42) Some(999)
); );
let before = relay_pressure_event_seq_for_testing(shared.as_ref()); let before = relay_pressure_event_seq_for_testing(shared.as_ref());
@@ -35,25 +23,43 @@ fn blackhat_registry_poison_recovers_with_fail_closed_reset_and_pressure_account
let after = relay_pressure_event_seq_for_testing(shared.as_ref()); let after = relay_pressure_event_seq_for_testing(shared.as_ref());
assert!( assert!(
after > before, after > before,
"pressure accounting must still advance after poison" "pressure accounting must still advance with stale ordered entries"
);
let mut seen_pressure_seq = before;
assert!(maybe_evict_idle_candidate_on_pressure_for_testing(
shared.as_ref(),
42,
&mut seen_pressure_seq,
&Stats::new()
));
assert_eq!(
oldest_relay_idle_candidate_for_testing(shared.as_ref()),
None
); );
clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref()); clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref());
} }
#[test] #[test]
fn clear_state_helper_must_reset_poisoned_registry_for_deterministic_fifo_tests() { fn clear_state_helper_must_reset_split_registry_for_deterministic_fifo_tests() {
let shared = ProxySharedState::new(); let shared = ProxySharedState::new();
clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref()); clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref());
let _ = catch_unwind(AssertUnwindSafe(|| { shared.middle_relay.relay_idle_registry.by_conn_id.insert(
let _guard = shared 999,
RelayIdleCandidateMeta {
mark_order_seq: 1,
mark_pressure_seq: 0,
},
);
shared
.middle_relay .middle_relay
.relay_idle_registry .relay_idle_registry
.ordered
.lock() .lock()
.expect("registry lock must be acquired before poison"); .insert((1, 999));
panic!("intentional poison while lock held"); set_relay_pressure_state_for_testing(shared.as_ref(), 7, 6);
}));
clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref()); clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref());

View File

@@ -52,6 +52,8 @@ async fn writer_command_loop(
) -> Result<()> { ) -> Result<()> {
loop { loop {
tokio::select! { tokio::select! {
biased;
_ = cancel.cancelled() => return Ok(()),
cmd = rx.recv() => { cmd = rx.recv() => {
match cmd { match cmd {
Some(WriterCommand::Data(payload)) => { Some(WriterCommand::Data(payload)) => {
@@ -69,7 +71,6 @@ async fn writer_command_loop(
Some(WriterCommand::Close) | None => return Ok(()), Some(WriterCommand::Close) | None => return Ok(()),
} }
} }
_ = cancel.cancelled() => return Ok(()),
} }
} }
} }
@@ -108,6 +109,7 @@ async fn ping_loop(
Duration::from_secs(wait) Duration::from_secs(wait)
}; };
tokio::select! { tokio::select! {
biased;
_ = cancel_ping_token.cancelled() => return, _ = cancel_ping_token.cancelled() => return,
_ = tokio::time::sleep(startup_jitter) => {} _ = tokio::time::sleep(startup_jitter) => {}
} }
@@ -131,6 +133,7 @@ async fn ping_loop(
Duration::from_secs(secs) Duration::from_secs(secs)
}; };
tokio::select! { tokio::select! {
biased;
_ = cancel_ping_token.cancelled() => return, _ = cancel_ping_token.cancelled() => return,
_ = tokio::time::sleep(wait) => {} _ = tokio::time::sleep(wait) => {}
} }
@@ -191,6 +194,7 @@ async fn rpc_proxy_req_signal_loop(
}; };
tokio::select! { tokio::select! {
biased;
_ = cancel_signal.cancelled() => return, _ = cancel_signal.cancelled() => return,
_ = tokio::time::sleep(Duration::from_millis(startup_jitter_ms)) => {} _ = tokio::time::sleep(Duration::from_millis(startup_jitter_ms)) => {}
} }
@@ -207,6 +211,7 @@ async fn rpc_proxy_req_signal_loop(
}; };
tokio::select! { tokio::select! {
biased;
_ = cancel_signal.cancelled() => return, _ = cancel_signal.cancelled() => return,
_ = tokio::time::sleep(wait) => {} _ = tokio::time::sleep(wait) => {}
} }

View File

@@ -242,6 +242,7 @@ pub(crate) async fn reader_loop(
let mut raw = enc_leftover; let mut raw = enc_leftover;
let mut expected_seq: i32 = 0; let mut expected_seq: i32 = 0;
let mut data_route_queue_full_streak = HashMap::<u64, u8>::new(); let mut data_route_queue_full_streak = HashMap::<u64, u8>::new();
let mut tmp = [0u8; 65_536];
let mut fairness = WorkerFairnessState::new( let mut fairness = WorkerFairnessState::new(
WorkerFairnessConfig { WorkerFairnessConfig {
worker_id: (writer_id as u16).saturating_add(1), worker_id: (writer_id as u16).saturating_add(1),
@@ -263,18 +264,18 @@ pub(crate) async fn reader_loop(
let fairshare_enabled = route_fairshare_enabled.load(Ordering::Relaxed); let fairshare_enabled = route_fairshare_enabled.load(Ordering::Relaxed);
fairness.set_backpressure_enabled(backpressure_enabled); fairness.set_backpressure_enabled(backpressure_enabled);
let fairness_has_backlog = should_schedule_fairness_retry(&fairness_snapshot); let fairness_has_backlog = should_schedule_fairness_retry(&fairness_snapshot);
let mut tmp = [0u8; 65_536];
let backlog_retry_enabled = fairness_has_backlog; let backlog_retry_enabled = fairness_has_backlog;
let backlog_retry_delay = let backlog_retry_delay =
fairness_retry_delay(reader_route_data_wait_ms.load(Ordering::Relaxed)); fairness_retry_delay(reader_route_data_wait_ms.load(Ordering::Relaxed));
let mut retry_only = false; let mut retry_only = false;
let n = tokio::select! { let n = tokio::select! {
biased;
_ = cancel.cancelled() => return Ok(()),
res = rd.read(&mut tmp) => res.map_err(ProxyError::Io)?, res = rd.read(&mut tmp) => res.map_err(ProxyError::Io)?,
_ = tokio::time::sleep(backlog_retry_delay), if backlog_retry_enabled => { _ = tokio::time::sleep(backlog_retry_delay), if backlog_retry_enabled => {
retry_only = true; retry_only = true;
0usize 0usize
}, },
_ = cancel.cancelled() => return Ok(()),
}; };
if retry_only { if retry_only {
let route_wait_ms = reader_route_data_wait_ms.load(Ordering::Relaxed); let route_wait_ms = reader_route_data_wait_ms.load(Ordering::Relaxed);

View File

@@ -77,26 +77,24 @@ struct HotBindingTable {
struct BindingState { struct BindingState {
inner: Mutex<BindingInner>, inner: Mutex<BindingInner>,
writer_idle_since_epoch_secs: DashMap<u64, u64>,
bound_clients_by_writer: DashMap<u64, usize>,
active_sessions_by_target_dc: DashMap<i16, usize>,
last_meta_for_writer: DashMap<u64, ConnMeta>,
} }
struct BindingInner { struct BindingInner {
writers: HashMap<u64, mpsc::Sender<WriterCommand>>,
writer_for_conn: HashMap<u64, u64>, writer_for_conn: HashMap<u64, u64>,
conns_for_writer: HashMap<u64, HashSet<u64>>, conns_for_writer: HashMap<u64, HashSet<u64>>,
meta: HashMap<u64, ConnMeta>, meta: HashMap<u64, ConnMeta>,
last_meta_for_writer: HashMap<u64, ConnMeta>,
writer_idle_since_epoch_secs: HashMap<u64, u64>,
} }
impl BindingInner { impl BindingInner {
fn new() -> Self { fn new() -> Self {
Self { Self {
writers: HashMap::new(),
writer_for_conn: HashMap::new(), writer_for_conn: HashMap::new(),
conns_for_writer: HashMap::new(), conns_for_writer: HashMap::new(),
meta: HashMap::new(), meta: HashMap::new(),
last_meta_for_writer: HashMap::new(),
writer_idle_since_epoch_secs: HashMap::new(),
} }
} }
} }
@@ -149,6 +147,10 @@ impl ConnRegistry {
}, },
binding: BindingState { binding: BindingState {
inner: Mutex::new(BindingInner::new()), inner: Mutex::new(BindingInner::new()),
writer_idle_since_epoch_secs: DashMap::new(),
bound_clients_by_writer: DashMap::new(),
active_sessions_by_target_dc: DashMap::new(),
last_meta_for_writer: DashMap::new(),
}, },
next_id: AtomicU64::new(start), next_id: AtomicU64::new(start),
route_channel_capacity, route_channel_capacity,

View File

@@ -13,13 +13,55 @@ use super::{
}; };
impl ConnRegistry { impl ConnRegistry {
fn set_writer_bound_count(&self, writer_id: u64, count: usize) {
self.binding.bound_clients_by_writer.insert(writer_id, count);
if count == 0 {
self.binding
.writer_idle_since_epoch_secs
.entry(writer_id)
.or_insert_with(Self::now_epoch_secs);
} else {
self.binding.writer_idle_since_epoch_secs.remove(&writer_id);
}
}
fn adjust_active_target_dc(&self, target_dc: i16, delta: isize) {
if target_dc == 0 || delta == 0 {
return;
}
if delta > 0 {
self.binding
.active_sessions_by_target_dc
.entry(target_dc)
.and_modify(|count| *count = count.saturating_add(delta as usize))
.or_insert(delta as usize);
return;
}
let remove =
if let Some(mut count) = self.binding.active_sessions_by_target_dc.get_mut(&target_dc) {
let decrement = delta.unsigned_abs();
*count = count.saturating_sub(decrement);
*count == 0
} else {
false
};
if remove {
self.binding.active_sessions_by_target_dc.remove(&target_dc);
}
}
pub async fn register_writer(&self, writer_id: u64, tx: mpsc::Sender<WriterCommand>) { pub async fn register_writer(&self, writer_id: u64, tx: mpsc::Sender<WriterCommand>) {
let mut binding = self.binding.inner.lock().await; let mut binding = self.binding.inner.lock().await;
binding.writers.insert(writer_id, tx.clone());
binding binding
.conns_for_writer .conns_for_writer
.entry(writer_id) .entry(writer_id)
.or_insert_with(HashSet::new); .or_insert_with(HashSet::new);
self.binding.bound_clients_by_writer.entry(writer_id).or_insert(0);
self.binding
.writer_idle_since_epoch_secs
.entry(writer_id)
.or_insert_with(Self::now_epoch_secs);
self.writers.map.insert(writer_id, tx); self.writers.map.insert(writer_id, tx);
} }
@@ -29,19 +71,18 @@ impl ConnRegistry {
self.routing.byte_budget.remove(&id); self.routing.byte_budget.remove(&id);
self.hot_binding.map.remove(&id); self.hot_binding.map.remove(&id);
let mut binding = self.binding.inner.lock().await; let mut binding = self.binding.inner.lock().await;
binding.meta.remove(&id); let previous_meta = binding.meta.remove(&id);
if let Some(writer_id) = binding.writer_for_conn.remove(&id) { if let Some(meta) = previous_meta.as_ref() {
let became_empty = if let Some(set) = binding.conns_for_writer.get_mut(&writer_id) { self.adjust_active_target_dc(meta.target_dc, -1);
set.remove(&id);
set.is_empty()
} else {
false
};
if became_empty {
binding
.writer_idle_since_epoch_secs
.insert(writer_id, Self::now_epoch_secs());
} }
if let Some(writer_id) = binding.writer_for_conn.remove(&id) {
let next_count = if let Some(set) = binding.conns_for_writer.get_mut(&writer_id) {
set.remove(&id);
set.len()
} else {
0
};
self.set_writer_bound_count(writer_id, next_count);
return Some(writer_id); return Some(writer_id);
} }
None None
@@ -248,7 +289,7 @@ impl ConnRegistry {
if !self.routing.map.contains_key(&conn_id) { if !self.routing.map.contains_key(&conn_id) {
return false; return false;
} }
if !binding.writers.contains_key(&writer_id) { if !self.writers.map.contains_key(&writer_id) {
return false; return false;
} }
@@ -256,28 +297,32 @@ impl ConnRegistry {
if let Some(previous_writer_id) = previous_writer_id if let Some(previous_writer_id) = previous_writer_id
&& previous_writer_id != writer_id && previous_writer_id != writer_id
{ {
let became_empty = let next_count =
if let Some(set) = binding.conns_for_writer.get_mut(&previous_writer_id) { if let Some(set) = binding.conns_for_writer.get_mut(&previous_writer_id) {
set.remove(&conn_id); set.remove(&conn_id);
set.is_empty() set.len()
} else { } else {
false 0
}; };
if became_empty { self.set_writer_bound_count(previous_writer_id, next_count);
binding
.writer_idle_since_epoch_secs
.insert(previous_writer_id, Self::now_epoch_secs());
}
} }
binding.meta.insert(conn_id, meta.clone()); if let Some(previous_meta) = binding.meta.insert(conn_id, meta.clone()) {
binding.last_meta_for_writer.insert(writer_id, meta.clone()); self.adjust_active_target_dc(previous_meta.target_dc, -1);
binding.writer_idle_since_epoch_secs.remove(&writer_id); }
binding self.adjust_active_target_dc(meta.target_dc, 1);
self.binding
.last_meta_for_writer
.insert(writer_id, meta.clone());
let next_count = {
let set = binding
.conns_for_writer .conns_for_writer
.entry(writer_id) .entry(writer_id)
.or_insert_with(HashSet::new) .or_insert_with(HashSet::new);
.insert(conn_id); set.insert(conn_id);
set.len()
};
self.set_writer_bound_count(writer_id, next_count);
self.hot_binding self.hot_binding
.map .map
.insert(conn_id, HotConnBinding { writer_id, meta }); .insert(conn_id, HotConnBinding { writer_id, meta });
@@ -290,27 +335,38 @@ impl ConnRegistry {
.conns_for_writer .conns_for_writer
.entry(writer_id) .entry(writer_id)
.or_insert_with(HashSet::new); .or_insert_with(HashSet::new);
binding let count = binding
.writer_idle_since_epoch_secs .conns_for_writer
.entry(writer_id) .get(&writer_id)
.or_insert(Self::now_epoch_secs()); .map(|set| set.len())
.unwrap_or(0);
self.set_writer_bound_count(writer_id, count);
} }
pub async fn get_last_writer_meta(&self, writer_id: u64) -> Option<ConnMeta> { pub async fn get_last_writer_meta(&self, writer_id: u64) -> Option<ConnMeta> {
let binding = self.binding.inner.lock().await; self.binding
binding.last_meta_for_writer.get(&writer_id).cloned() .last_meta_for_writer
.get(&writer_id)
.map(|entry| entry.value().clone())
} }
pub async fn writer_idle_since_snapshot(&self) -> HashMap<u64, u64> { pub async fn writer_idle_since_snapshot(&self) -> HashMap<u64, u64> {
let binding = self.binding.inner.lock().await; self.binding
binding.writer_idle_since_epoch_secs.clone() .writer_idle_since_epoch_secs
.iter()
.map(|entry| (*entry.key(), *entry.value()))
.collect()
} }
pub async fn writer_idle_since_for_writer_ids(&self, writer_ids: &[u64]) -> HashMap<u64, u64> { pub async fn writer_idle_since_for_writer_ids(&self, writer_ids: &[u64]) -> HashMap<u64, u64> {
let binding = self.binding.inner.lock().await;
let mut out = HashMap::<u64, u64>::with_capacity(writer_ids.len()); let mut out = HashMap::<u64, u64>::with_capacity(writer_ids.len());
for writer_id in writer_ids { for writer_id in writer_ids {
if let Some(idle_since) = binding.writer_idle_since_epoch_secs.get(writer_id).copied() { if let Some(idle_since) = self
.binding
.writer_idle_since_epoch_secs
.get(writer_id)
.map(|entry| *entry.value())
{
out.insert(*writer_id, idle_since); out.insert(*writer_id, idle_since);
} }
} }
@@ -320,25 +376,19 @@ impl ConnRegistry {
pub(in crate::transport::middle_proxy) async fn writer_activity_snapshot( pub(in crate::transport::middle_proxy) async fn writer_activity_snapshot(
&self, &self,
) -> WriterActivitySnapshot { ) -> WriterActivitySnapshot {
let binding = self.binding.inner.lock().await;
let mut bound_clients_by_writer = HashMap::<u64, usize>::new();
let mut active_sessions_by_target_dc = HashMap::<i16, usize>::new();
for (writer_id, conn_ids) in &binding.conns_for_writer {
bound_clients_by_writer.insert(*writer_id, conn_ids.len());
}
for conn_meta in binding.meta.values() {
if conn_meta.target_dc == 0 {
continue;
}
*active_sessions_by_target_dc
.entry(conn_meta.target_dc)
.or_insert(0) += 1;
}
WriterActivitySnapshot { WriterActivitySnapshot {
bound_clients_by_writer, bound_clients_by_writer: self
active_sessions_by_target_dc, .binding
.bound_clients_by_writer
.iter()
.map(|entry| (*entry.key(), *entry.value()))
.collect(),
active_sessions_by_target_dc: self
.binding
.active_sessions_by_target_dc
.iter()
.map(|entry| (*entry.key(), *entry.value()))
.collect(),
} }
} }
@@ -393,10 +443,10 @@ impl ConnRegistry {
pub async fn writer_lost(&self, writer_id: u64) -> Vec<BoundConn> { pub async fn writer_lost(&self, writer_id: u64) -> Vec<BoundConn> {
let mut binding = self.binding.inner.lock().await; let mut binding = self.binding.inner.lock().await;
binding.writers.remove(&writer_id);
self.writers.map.remove(&writer_id); self.writers.map.remove(&writer_id);
binding.last_meta_for_writer.remove(&writer_id); self.binding.last_meta_for_writer.remove(&writer_id);
binding.writer_idle_since_epoch_secs.remove(&writer_id); self.binding.writer_idle_since_epoch_secs.remove(&writer_id);
self.binding.bound_clients_by_writer.remove(&writer_id);
let conns = binding let conns = binding
.conns_for_writer .conns_for_writer
.remove(&writer_id) .remove(&writer_id)
@@ -410,6 +460,10 @@ impl ConnRegistry {
continue; continue;
} }
binding.writer_for_conn.remove(&conn_id); binding.writer_for_conn.remove(&conn_id);
let meta = binding.meta.remove(&conn_id);
if let Some(meta) = meta.as_ref() {
self.adjust_active_target_dc(meta.target_dc, -1);
}
let remove_hot = self let remove_hot = self
.hot_binding .hot_binding
.map .map
@@ -419,10 +473,10 @@ impl ConnRegistry {
if remove_hot { if remove_hot {
self.hot_binding.map.remove(&conn_id); self.hot_binding.map.remove(&conn_id);
} }
if let Some(m) = binding.meta.get(&conn_id) { if let Some(m) = meta {
out.push(BoundConn { out.push(BoundConn {
conn_id, conn_id,
meta: m.clone(), meta: m,
}); });
} }
} }
@@ -438,11 +492,10 @@ impl ConnRegistry {
} }
pub async fn is_writer_empty(&self, writer_id: u64) -> bool { pub async fn is_writer_empty(&self, writer_id: u64) -> bool {
let binding = self.binding.inner.lock().await; self.binding
binding .bound_clients_by_writer
.conns_for_writer
.get(&writer_id) .get(&writer_id)
.map(|s| s.is_empty()) .map(|count| *count.value() == 0)
.unwrap_or(true) .unwrap_or(true)
} }
@@ -457,21 +510,20 @@ impl ConnRegistry {
return false; return false;
} }
binding.writers.remove(&writer_id);
self.writers.map.remove(&writer_id); self.writers.map.remove(&writer_id);
binding.last_meta_for_writer.remove(&writer_id); self.binding.last_meta_for_writer.remove(&writer_id);
binding.writer_idle_since_epoch_secs.remove(&writer_id); self.binding.writer_idle_since_epoch_secs.remove(&writer_id);
self.binding.bound_clients_by_writer.remove(&writer_id);
binding.conns_for_writer.remove(&writer_id); binding.conns_for_writer.remove(&writer_id);
true true
} }
#[allow(dead_code)] #[allow(dead_code)]
pub(super) async fn non_empty_writer_ids(&self, writer_ids: &[u64]) -> HashSet<u64> { pub(super) async fn non_empty_writer_ids(&self, writer_ids: &[u64]) -> HashSet<u64> {
let binding = self.binding.inner.lock().await;
let mut out = HashSet::<u64>::with_capacity(writer_ids.len()); let mut out = HashSet::<u64>::with_capacity(writer_ids.len());
for writer_id in writer_ids { for writer_id in writer_ids {
if let Some(conns) = binding.conns_for_writer.get(writer_id) if let Some(count) = self.binding.bound_clients_by_writer.get(writer_id)
&& !conns.is_empty() && *count.value() > 0
{ {
out.insert(*writer_id); out.insert(*writer_id);
} }

View File

@@ -15,7 +15,6 @@ use super::registry::ConnMeta;
use super::wire::build_proxy_req_payload; use super::wire::build_proxy_req_payload;
use crate::config::{MeRouteNoWriterMode, MeWriterPickMode}; use crate::config::{MeRouteNoWriterMode, MeWriterPickMode};
use crate::error::{ProxyError, Result}; use crate::error::{ProxyError, Result};
use crate::network::IpFamily;
use crate::stream::PooledBuffer; use crate::stream::PooledBuffer;
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
@@ -124,9 +123,8 @@ impl MePool {
} }
let mut writers_snapshot = { let mut writers_snapshot = {
let ws = self.writers.read().await; let ws = self.writers.snapshot();
if ws.is_empty() { if ws.is_empty() {
drop(ws);
match no_writer_mode { match no_writer_mode {
MeRouteNoWriterMode::AsyncRecoveryFailfast => { MeRouteNoWriterMode::AsyncRecoveryFailfast => {
let deadline = *no_writer_deadline.get_or_insert_with(|| { let deadline = *no_writer_deadline.get_or_insert_with(|| {
@@ -154,38 +152,32 @@ impl MePool {
for _ in for _ in
0..self.route_runtime.me_route_inline_recovery_attempts.max(1) 0..self.route_runtime.me_route_inline_recovery_attempts.max(1)
{ {
for family in self.family_order() { let preferred = self.preferred_endpoints_by_dc.load_full();
let map = match family { for (dc, addrs) in preferred.iter() {
IpFamily::V4 => self.proxy_map_v4.read().await.clone(), for addr in addrs {
IpFamily::V6 => self.proxy_map_v6.read().await.clone(),
};
for (dc, addrs) in &map {
for (ip, port) in addrs {
let addr = SocketAddr::new(*ip, *port);
let _ = self let _ = self
.connect_one_for_dc( .connect_one_for_dc(
addr, *addr,
*dc, *dc,
self.rng.as_ref(), self.rng.as_ref(),
) )
.await; .await;
} }
} }
} if !self.writers.snapshot().is_empty() {
if !self.writers.read().await.is_empty() {
break; break;
} }
} }
} }
if !self.writers.read().await.is_empty() { if !self.writers.snapshot().is_empty() {
continue; continue;
} }
let deadline = *no_writer_deadline.get_or_insert_with(|| { let deadline = *no_writer_deadline.get_or_insert_with(|| {
Instant::now() + self.route_runtime.me_route_inline_recovery_wait Instant::now() + self.route_runtime.me_route_inline_recovery_wait
}); });
if !self.wait_for_writer_until(deadline).await { if !self.wait_for_writer_until(deadline).await {
if !self.writers.read().await.is_empty() { if !self.writers.snapshot().is_empty() {
continue; continue;
} }
self.stats.increment_me_no_writer_failfast_total(); self.stats.increment_me_no_writer_failfast_total();
@@ -222,7 +214,7 @@ impl MePool {
} }
} }
} }
ws.clone() ws
}; };
let mut candidate_indices = self let mut candidate_indices = self
@@ -285,7 +277,12 @@ impl MePool {
)); ));
} }
emergency_attempts += 1; emergency_attempts += 1;
let mut endpoints = self.endpoint_candidates_for_target_dc(routed_dc).await; let mut endpoints = self
.preferred_endpoints_by_dc
.load()
.get(&routed_dc)
.cloned()
.unwrap_or_default();
endpoints.shuffle(&mut rand::rng()); endpoints.shuffle(&mut rand::rng());
for addr in endpoints { for addr in endpoints {
if self if self
@@ -298,9 +295,7 @@ impl MePool {
} }
tokio::time::sleep(Duration::from_millis(100 * emergency_attempts as u64)) tokio::time::sleep(Duration::from_millis(100 * emergency_attempts as u64))
.await; .await;
let ws2 = self.writers.read().await; writers_snapshot = self.writers.snapshot();
writers_snapshot = ws2.clone();
drop(ws2);
candidate_indices = self candidate_indices = self
.candidate_indices_for_dc(&writers_snapshot, routed_dc, false) .candidate_indices_for_dc(&writers_snapshot, routed_dc, false)
.await; .await;

View File

@@ -1,13 +1,9 @@
use std::collections::HashSet;
use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tracing::warn; use tracing::warn;
use crate::network::IpFamily;
use super::super::MePool; use super::super::MePool;
use super::{ use super::{
HYBRID_GLOBAL_BURST_PERIOD_ROUNDS, HYBRID_RECENT_SUCCESS_WINDOW_MS, HYBRID_GLOBAL_BURST_PERIOD_ROUNDS, HYBRID_RECENT_SUCCESS_WINDOW_MS,
@@ -17,18 +13,18 @@ use super::{
impl MePool { impl MePool {
pub(super) async fn wait_for_writer_until(&self, deadline: Instant) -> bool { pub(super) async fn wait_for_writer_until(&self, deadline: Instant) -> bool {
let mut rx = self.writer_epoch.subscribe(); let mut rx = self.writer_epoch.subscribe();
if !self.writers.read().await.is_empty() { if !self.writers.snapshot().is_empty() {
return true; return true;
} }
let now = Instant::now(); let now = Instant::now();
if now >= deadline { if now >= deadline {
return !self.writers.read().await.is_empty(); return !self.writers.snapshot().is_empty();
} }
let timeout = deadline.saturating_duration_since(now); let timeout = deadline.saturating_duration_since(now);
if tokio::time::timeout(timeout, rx.changed()).await.is_ok() { if tokio::time::timeout(timeout, rx.changed()).await.is_ok() {
return !self.writers.read().await.is_empty(); return !self.writers.snapshot().is_empty();
} }
!self.writers.read().await.is_empty() !self.writers.snapshot().is_empty()
} }
pub(super) async fn wait_for_candidate_until(&self, routed_dc: i32, deadline: Instant) -> bool { pub(super) async fn wait_for_candidate_until(&self, routed_dc: i32, deadline: Instant) -> bool {
@@ -58,11 +54,11 @@ impl MePool {
pub(super) async fn has_candidate_for_target_dc(&self, routed_dc: i32) -> bool { pub(super) async fn has_candidate_for_target_dc(&self, routed_dc: i32) -> bool {
let writers_snapshot = { let writers_snapshot = {
let ws = self.writers.read().await; let ws = self.writers.snapshot();
if ws.is_empty() { if ws.is_empty() {
return false; return false;
} }
ws.clone() ws
}; };
let mut candidate_indices = self let mut candidate_indices = self
.candidate_indices_for_dc(&writers_snapshot, routed_dc, false) .candidate_indices_for_dc(&writers_snapshot, routed_dc, false)
@@ -79,7 +75,7 @@ impl MePool {
self: &Arc<Self>, self: &Arc<Self>,
routed_dc: i32, routed_dc: i32,
) -> bool { ) -> bool {
let endpoints = self.endpoint_candidates_for_target_dc(routed_dc).await; let endpoints = self.preferred_endpoints_for_dc(routed_dc).await;
if endpoints.is_empty() { if endpoints.is_empty() {
return false; return false;
} }
@@ -92,32 +88,18 @@ impl MePool {
pub(super) async fn trigger_async_recovery_global(self: &Arc<Self>) { pub(super) async fn trigger_async_recovery_global(self: &Arc<Self>) {
self.stats.increment_me_async_recovery_trigger_total(); self.stats.increment_me_async_recovery_trigger_total();
let mut seen = HashSet::<(i32, SocketAddr)>::new(); let preferred = self.preferred_endpoints_by_dc.load();
for family in self.family_order() { let mut triggered = 0usize;
let map_guard = match family { for (dc, addrs) in preferred.iter() {
IpFamily::V4 => self.proxy_map_v4.read().await, for addr in addrs {
IpFamily::V6 => self.proxy_map_v6.read().await, self.trigger_immediate_refill_for_dc(*addr, *dc);
}; triggered = triggered.saturating_add(1);
for (dc, addrs) in map_guard.iter() { if triggered >= 8 {
for (ip, port) in addrs {
let addr = SocketAddr::new(*ip, *port);
if seen.insert((*dc, addr)) {
self.trigger_immediate_refill_for_dc(addr, *dc);
}
if seen.len() >= 8 {
return; return;
} }
} }
} }
} }
}
pub(super) async fn endpoint_candidates_for_target_dc(
&self,
routed_dc: i32,
) -> Vec<SocketAddr> {
self.preferred_endpoints_for_dc(routed_dc).await
}
pub(super) async fn maybe_trigger_hybrid_recovery( pub(super) async fn maybe_trigger_hybrid_recovery(
self: &Arc<Self>, self: &Arc<Self>,

View File

@@ -15,7 +15,10 @@ impl MePool {
routed_dc: i32, routed_dc: i32,
include_warm: bool, include_warm: bool,
) -> Vec<usize> { ) -> Vec<usize> {
let preferred = self.preferred_endpoints_for_dc(routed_dc).await; let preferred_snapshot = self.preferred_endpoints_by_dc.load();
let Some(preferred) = preferred_snapshot.get(&routed_dc) else {
return Vec::new();
};
if preferred.is_empty() { if preferred.is_empty() {
return Vec::new(); return Vec::new();
} }
@@ -25,7 +28,7 @@ impl MePool {
if !self.writer_eligible_for_selection(w, include_warm) { if !self.writer_eligible_for_selection(w, include_warm) {
continue; continue;
} }
if w.writer_dc == routed_dc && preferred.contains(&w.addr) { if w.writer_dc == routed_dc && preferred.binary_search(&w.addr).is_ok() {
out.push(idx); out.push(idx);
} }
} }