diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 22f40b5..b0ae394 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -320,7 +320,7 @@ impl MePool { pool_size: 2, proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)), proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)), - default_dc: AtomicI32::new(default_dc.unwrap_or(0)), + default_dc: AtomicI32::new(default_dc.unwrap_or(2)), next_writer_id: AtomicU64::new(1), ping_tracker: Arc::new(Mutex::new(HashMap::new())), rtt_stats: Arc::new(Mutex::new(HashMap::new())), @@ -625,6 +625,58 @@ impl MePool { order } + pub(super) fn default_dc_for_routing(&self) -> i32 { + let dc = self.default_dc.load(Ordering::Relaxed); + if dc == 0 { 2 } else { dc } + } + + pub(super) fn dc_lookup_chain_for_target(&self, target_dc: i32) -> Vec { + let mut out = Vec::with_capacity(1); + if target_dc != 0 { + out.push(target_dc); + } else { + // Use default DC only when target DC is unknown and pinning is not established. + let fallback_dc = self.default_dc_for_routing(); + out.push(fallback_dc); + } + out + } + + pub(super) async fn resolve_dc_for_endpoint(&self, addr: SocketAddr) -> i32 { + let map_guard = if addr.is_ipv4() { + self.proxy_map_v4.read().await + } else { + self.proxy_map_v6.read().await + }; + + let mut matched_dc: Option = None; + let mut ambiguous = false; + for (dc, addrs) in map_guard.iter() { + if addrs + .iter() + .any(|(ip, port)| SocketAddr::new(*ip, *port) == addr) + { + match matched_dc { + None => matched_dc = Some(*dc), + Some(prev_dc) if prev_dc == *dc => {} + Some(_) => { + ambiguous = true; + break; + } + } + } + } + drop(map_guard); + + if !ambiguous + && let Some(dc) = matched_dc + { + return dc; + } + + self.default_dc_for_routing() + } + pub(super) async fn proxy_map_for_family( &self, family: IpFamily, diff --git a/src/transport/middle_proxy/send.rs b/src/transport/middle_proxy/send.rs index b442a8a..b9b1fd5 100644 --- a/src/transport/middle_proxy/send.rs +++ b/src/transport/middle_proxy/send.rs @@ -195,38 +195,25 @@ impl MePool { return Err(ProxyError::Proxy("No ME writers available for target DC".into())); } emergency_attempts += 1; - for family in self.family_order() { - let map_guard = match family { - IpFamily::V4 => self.proxy_map_v4.read().await, - IpFamily::V6 => self.proxy_map_v6.read().await, - }; - if let Some(addrs) = map_guard.get(&(target_dc as i32)) { - let mut shuffled = addrs.clone(); - shuffled.shuffle(&mut rand::rng()); - drop(map_guard); - for (ip, port) in shuffled { - let addr = SocketAddr::new(ip, port); - if self.connect_one(addr, self.rng.as_ref()).await.is_ok() { - break; - } - } - tokio::time::sleep(Duration::from_millis(100 * emergency_attempts as u64)).await; - let ws2 = self.writers.read().await; - writers_snapshot = ws2.clone(); - drop(ws2); - candidate_indices = self - .candidate_indices_for_dc(&writers_snapshot, target_dc, false) - .await; - if candidate_indices.is_empty() { - candidate_indices = self - .candidate_indices_for_dc(&writers_snapshot, target_dc, true) - .await; - } - if !candidate_indices.is_empty() { - break; - } + let mut endpoints = self.endpoint_candidates_for_target_dc(target_dc).await; + endpoints.shuffle(&mut rand::rng()); + for addr in endpoints { + if self.connect_one(addr, self.rng.as_ref()).await.is_ok() { + break; } } + tokio::time::sleep(Duration::from_millis(100 * emergency_attempts as u64)).await; + let ws2 = self.writers.read().await; + writers_snapshot = ws2.clone(); + drop(ws2); + candidate_indices = self + .candidate_indices_for_dc(&writers_snapshot, target_dc, false) + .await; + if candidate_indices.is_empty() { + candidate_indices = self + .candidate_indices_for_dc(&writers_snapshot, target_dc, true) + .await; + } if candidate_indices.is_empty() { return Err(ProxyError::Proxy("No ME writers available for target DC".into())); } @@ -458,26 +445,28 @@ impl MePool { let key = target_dc as i32; let mut preferred = Vec::::new(); let mut seen = HashSet::::new(); + let lookup_keys = self.dc_lookup_chain_for_target(key); for family in self.family_order() { let map = match family { IpFamily::V4 => self.proxy_map_v4.read().await.clone(), IpFamily::V6 => self.proxy_map_v6.read().await.clone(), }; - let mut lookup_keys = vec![key, key.abs(), -key.abs()]; - let def = self.default_dc.load(Ordering::Relaxed); - if def != 0 { - lookup_keys.push(def); - } - for lookup in lookup_keys { + let mut family_selected = Vec::::new(); + for lookup in lookup_keys.iter().copied() { if let Some(addrs) = map.get(&lookup) { for (ip, port) in addrs { - let addr = SocketAddr::new(*ip, *port); - if seen.insert(addr) { - preferred.push(addr); - } + family_selected.push(SocketAddr::new(*ip, *port)); } } + if !family_selected.is_empty() { + break; + } + } + for addr in family_selected { + if seen.insert(addr) { + preferred.push(addr); + } } if !preferred.is_empty() && !self.decision.effective_multipath { break; @@ -569,36 +558,23 @@ impl MePool { ) -> Vec { let key = target_dc as i32; let mut preferred = Vec::::new(); + let lookup_keys = self.dc_lookup_chain_for_target(key); for family in self.family_order() { let map_guard = match family { IpFamily::V4 => self.proxy_map_v4.read().await, IpFamily::V6 => self.proxy_map_v6.read().await, }; - - if let Some(v) = map_guard.get(&key) { - preferred.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port))); - } - if preferred.is_empty() { - let abs = key.abs(); - if let Some(v) = map_guard.get(&abs) { - preferred.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port))); - } - } - if preferred.is_empty() { - let abs = key.abs(); - if let Some(v) = map_guard.get(&-abs) { - preferred.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port))); - } - } - if preferred.is_empty() { - let def = self.default_dc.load(Ordering::Relaxed); - if def != 0 - && let Some(v) = map_guard.get(&def) - { - preferred.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port))); + let mut family_selected = Vec::::new(); + for lookup in lookup_keys.iter().copied() { + if let Some(v) = map_guard.get(&lookup) { + family_selected.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port))); + } + if !family_selected.is_empty() { + break; } } + preferred.extend(family_selected); drop(map_guard); @@ -608,9 +584,7 @@ impl MePool { } if preferred.is_empty() { - return (0..writers.len()) - .filter(|i| self.writer_eligible_for_selection(&writers[*i], include_warm)) - .collect(); + return Vec::new(); } let mut out = Vec::new(); @@ -622,11 +596,6 @@ impl MePool { out.push(idx); } } - if out.is_empty() { - return (0..writers.len()) - .filter(|i| self.writer_eligible_for_selection(&writers[*i], include_warm)) - .collect(); - } out }