use std::collections::{HashMap, HashSet}; use std::net::{IpAddr, Ipv6Addr, SocketAddr}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU8, AtomicU32, AtomicU64, AtomicUsize, Ordering}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use tokio::sync::{Mutex, Notify, RwLock, mpsc}; use tokio_util::sync::CancellationToken; use crate::config::{MeBindStaleMode, MeFloorMode, MeRouteNoWriterMode, MeSocksKdfPolicy}; use crate::crypto::SecureRandom; use crate::network::IpFamily; use crate::network::probe::NetworkDecision; use crate::transport::UpstreamManager; use super::ConnRegistry; use super::codec::WriterCommand; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub(super) struct RefillDcKey { pub dc: i32, pub family: IpFamily, } #[derive(Clone)] pub struct MeWriter { pub id: u64, pub addr: SocketAddr, pub generation: u64, pub contour: Arc, pub created_at: Instant, pub tx: mpsc::Sender, pub cancel: CancellationToken, pub degraded: Arc, pub draining: Arc, pub draining_started_at_epoch_secs: Arc, pub allow_drain_fallback: Arc, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] #[repr(u8)] pub(super) enum WriterContour { Warm = 0, Active = 1, Draining = 2, } impl WriterContour { pub(super) fn as_u8(self) -> u8 { self as u8 } pub(super) fn from_u8(value: u8) -> Self { match value { 0 => Self::Warm, 1 => Self::Active, 2 => Self::Draining, _ => Self::Draining, } } } #[derive(Debug, Clone)] pub struct SecretSnapshot { pub epoch: u64, pub key_selector: u32, pub secret: Vec, } #[allow(dead_code)] pub struct MePool { pub(super) registry: Arc, pub(super) writers: Arc>>, pub(super) rr: AtomicU64, pub(super) decision: NetworkDecision, pub(super) upstream: Option>, pub(super) rng: Arc, pub(super) proxy_tag: Option>, pub(super) proxy_secret: Arc>, pub(super) nat_ip_cfg: Option, pub(super) nat_ip_detected: Arc>>, pub(super) nat_probe: bool, pub(super) nat_stun: Option, pub(super) nat_stun_servers: Vec, pub(super) nat_stun_live_servers: Arc>>, pub(super) nat_probe_concurrency: usize, pub(super) detected_ipv6: Option, pub(super) nat_probe_attempts: std::sync::atomic::AtomicU8, pub(super) nat_probe_disabled: std::sync::atomic::AtomicBool, pub(super) stun_backoff_until: Arc>>, pub(super) me_one_retry: u8, pub(super) me_one_timeout: Duration, pub(super) me_keepalive_enabled: bool, pub(super) me_keepalive_interval: Duration, pub(super) me_keepalive_jitter: Duration, pub(super) me_keepalive_payload_random: bool, pub(super) rpc_proxy_req_every_secs: AtomicU64, pub(super) me_warmup_stagger_enabled: bool, pub(super) me_warmup_step_delay: Duration, pub(super) me_warmup_step_jitter: Duration, pub(super) me_reconnect_max_concurrent_per_dc: u32, pub(super) me_reconnect_backoff_base: Duration, pub(super) me_reconnect_backoff_cap: Duration, pub(super) me_reconnect_fast_retry_count: u32, pub(super) me_single_endpoint_shadow_writers: AtomicU8, pub(super) me_single_endpoint_outage_mode_enabled: AtomicBool, pub(super) me_single_endpoint_outage_disable_quarantine: AtomicBool, pub(super) me_single_endpoint_outage_backoff_min_ms: AtomicU64, pub(super) me_single_endpoint_outage_backoff_max_ms: AtomicU64, pub(super) me_single_endpoint_shadow_rotate_every_secs: AtomicU64, pub(super) me_floor_mode: AtomicU8, pub(super) me_adaptive_floor_idle_secs: AtomicU64, pub(super) me_adaptive_floor_min_writers_single_endpoint: AtomicU8, pub(super) me_adaptive_floor_recover_grace_secs: AtomicU64, pub(super) proxy_map_v4: Arc>>>, pub(super) proxy_map_v6: Arc>>>, pub(super) default_dc: AtomicI32, pub(super) next_writer_id: AtomicU64, pub(super) ping_tracker: Arc>>, pub(super) rtt_stats: Arc>>, pub(super) nat_reflection_cache: Arc>, pub(super) nat_reflection_singleflight_v4: Arc>, pub(super) nat_reflection_singleflight_v6: Arc>, pub(super) writer_available: Arc, pub(super) refill_inflight: Arc>>, pub(super) refill_inflight_dc: Arc>>, pub(super) conn_count: AtomicUsize, pub(super) stats: Arc, pub(super) generation: AtomicU64, pub(super) active_generation: AtomicU64, pub(super) warm_generation: AtomicU64, pub(super) pending_hardswap_generation: AtomicU64, pub(super) pending_hardswap_started_at_epoch_secs: AtomicU64, pub(super) pending_hardswap_map_hash: AtomicU64, pub(super) hardswap: AtomicBool, pub(super) endpoint_quarantine: Arc>>, pub(super) kdf_material_fingerprint: Arc>>, pub(super) me_pool_drain_ttl_secs: AtomicU64, pub(super) me_pool_force_close_secs: AtomicU64, pub(super) me_pool_min_fresh_ratio_permille: AtomicU32, pub(super) me_hardswap_warmup_delay_min_ms: AtomicU64, pub(super) me_hardswap_warmup_delay_max_ms: AtomicU64, pub(super) me_hardswap_warmup_extra_passes: AtomicU32, pub(super) me_hardswap_warmup_pass_backoff_base_ms: AtomicU64, pub(super) me_bind_stale_mode: AtomicU8, pub(super) me_bind_stale_ttl_secs: AtomicU64, pub(super) secret_atomic_snapshot: AtomicBool, pub(super) me_deterministic_writer_sort: AtomicBool, pub(super) me_socks_kdf_policy: AtomicU8, pub(super) me_route_no_writer_mode: AtomicU8, pub(super) me_route_no_writer_wait: Duration, pub(super) me_route_inline_recovery_attempts: u32, pub(super) me_route_inline_recovery_wait: Duration, pub(super) runtime_ready: AtomicBool, pool_size: usize, } #[derive(Debug, Default)] pub struct NatReflectionCache { pub v4: Option<(std::time::Instant, std::net::SocketAddr)>, pub v6: Option<(std::time::Instant, std::net::SocketAddr)>, } impl MePool { fn ratio_to_permille(ratio: f32) -> u32 { let clamped = ratio.clamp(0.0, 1.0); (clamped * 1000.0).round() as u32 } pub(super) fn permille_to_ratio(permille: u32) -> f32 { (permille.min(1000) as f32) / 1000.0 } pub(super) fn now_epoch_secs() -> u64 { SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_secs() } pub fn new( proxy_tag: Option>, proxy_secret: Vec, nat_ip: Option, nat_probe: bool, nat_stun: Option, nat_stun_servers: Vec, nat_probe_concurrency: usize, detected_ipv6: Option, me_one_retry: u8, me_one_timeout_ms: u64, proxy_map_v4: HashMap>, proxy_map_v6: HashMap>, default_dc: Option, decision: NetworkDecision, upstream: Option>, rng: Arc, stats: Arc, me_keepalive_enabled: bool, me_keepalive_interval_secs: u64, me_keepalive_jitter_secs: u64, me_keepalive_payload_random: bool, rpc_proxy_req_every_secs: u64, me_warmup_stagger_enabled: bool, me_warmup_step_delay_ms: u64, me_warmup_step_jitter_ms: u64, me_reconnect_max_concurrent_per_dc: u32, me_reconnect_backoff_base_ms: u64, me_reconnect_backoff_cap_ms: u64, me_reconnect_fast_retry_count: u32, me_single_endpoint_shadow_writers: u8, me_single_endpoint_outage_mode_enabled: bool, me_single_endpoint_outage_disable_quarantine: bool, me_single_endpoint_outage_backoff_min_ms: u64, me_single_endpoint_outage_backoff_max_ms: u64, me_single_endpoint_shadow_rotate_every_secs: u64, me_floor_mode: MeFloorMode, me_adaptive_floor_idle_secs: u64, me_adaptive_floor_min_writers_single_endpoint: u8, me_adaptive_floor_recover_grace_secs: u64, hardswap: bool, me_pool_drain_ttl_secs: u64, me_pool_force_close_secs: u64, me_pool_min_fresh_ratio: f32, me_hardswap_warmup_delay_min_ms: u64, me_hardswap_warmup_delay_max_ms: u64, me_hardswap_warmup_extra_passes: u8, me_hardswap_warmup_pass_backoff_base_ms: u64, me_bind_stale_mode: MeBindStaleMode, me_bind_stale_ttl_secs: u64, me_secret_atomic_snapshot: bool, me_deterministic_writer_sort: bool, me_socks_kdf_policy: MeSocksKdfPolicy, me_route_backpressure_base_timeout_ms: u64, me_route_backpressure_high_timeout_ms: u64, me_route_backpressure_high_watermark_pct: u8, me_route_no_writer_mode: MeRouteNoWriterMode, me_route_no_writer_wait_ms: u64, me_route_inline_recovery_attempts: u32, me_route_inline_recovery_wait_ms: u64, ) -> Arc { let registry = Arc::new(ConnRegistry::new()); registry.update_route_backpressure_policy( me_route_backpressure_base_timeout_ms, me_route_backpressure_high_timeout_ms, me_route_backpressure_high_watermark_pct, ); Arc::new(Self { registry, writers: Arc::new(RwLock::new(Vec::new())), rr: AtomicU64::new(0), decision, upstream, rng, proxy_tag, proxy_secret: Arc::new(RwLock::new(SecretSnapshot { epoch: 1, key_selector: if proxy_secret.len() >= 4 { u32::from_le_bytes([ proxy_secret[0], proxy_secret[1], proxy_secret[2], proxy_secret[3], ]) } else { 0 }, secret: proxy_secret, })), nat_ip_cfg: nat_ip, nat_ip_detected: Arc::new(RwLock::new(None)), nat_probe, nat_stun, nat_stun_servers, nat_stun_live_servers: Arc::new(RwLock::new(Vec::new())), nat_probe_concurrency: nat_probe_concurrency.max(1), detected_ipv6, nat_probe_attempts: std::sync::atomic::AtomicU8::new(0), nat_probe_disabled: std::sync::atomic::AtomicBool::new(false), stun_backoff_until: Arc::new(RwLock::new(None)), me_one_retry, me_one_timeout: Duration::from_millis(me_one_timeout_ms), stats, me_keepalive_enabled, me_keepalive_interval: Duration::from_secs(me_keepalive_interval_secs), me_keepalive_jitter: Duration::from_secs(me_keepalive_jitter_secs), me_keepalive_payload_random, rpc_proxy_req_every_secs: AtomicU64::new(rpc_proxy_req_every_secs), me_warmup_stagger_enabled, me_warmup_step_delay: Duration::from_millis(me_warmup_step_delay_ms), me_warmup_step_jitter: Duration::from_millis(me_warmup_step_jitter_ms), me_reconnect_max_concurrent_per_dc, me_reconnect_backoff_base: Duration::from_millis(me_reconnect_backoff_base_ms), me_reconnect_backoff_cap: Duration::from_millis(me_reconnect_backoff_cap_ms), me_reconnect_fast_retry_count, me_single_endpoint_shadow_writers: AtomicU8::new(me_single_endpoint_shadow_writers), me_single_endpoint_outage_mode_enabled: AtomicBool::new( me_single_endpoint_outage_mode_enabled, ), me_single_endpoint_outage_disable_quarantine: AtomicBool::new( me_single_endpoint_outage_disable_quarantine, ), me_single_endpoint_outage_backoff_min_ms: AtomicU64::new( me_single_endpoint_outage_backoff_min_ms, ), me_single_endpoint_outage_backoff_max_ms: AtomicU64::new( me_single_endpoint_outage_backoff_max_ms, ), me_single_endpoint_shadow_rotate_every_secs: AtomicU64::new( me_single_endpoint_shadow_rotate_every_secs, ), me_floor_mode: AtomicU8::new(me_floor_mode.as_u8()), me_adaptive_floor_idle_secs: AtomicU64::new(me_adaptive_floor_idle_secs), me_adaptive_floor_min_writers_single_endpoint: AtomicU8::new( me_adaptive_floor_min_writers_single_endpoint, ), me_adaptive_floor_recover_grace_secs: AtomicU64::new( me_adaptive_floor_recover_grace_secs, ), pool_size: 2, proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)), proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)), default_dc: AtomicI32::new(default_dc.unwrap_or(2)), next_writer_id: AtomicU64::new(1), ping_tracker: Arc::new(Mutex::new(HashMap::new())), rtt_stats: Arc::new(Mutex::new(HashMap::new())), nat_reflection_cache: Arc::new(Mutex::new(NatReflectionCache::default())), nat_reflection_singleflight_v4: Arc::new(Mutex::new(())), nat_reflection_singleflight_v6: Arc::new(Mutex::new(())), writer_available: Arc::new(Notify::new()), refill_inflight: Arc::new(Mutex::new(HashSet::new())), refill_inflight_dc: Arc::new(Mutex::new(HashSet::new())), conn_count: AtomicUsize::new(0), generation: AtomicU64::new(1), active_generation: AtomicU64::new(1), warm_generation: AtomicU64::new(0), pending_hardswap_generation: AtomicU64::new(0), pending_hardswap_started_at_epoch_secs: AtomicU64::new(0), pending_hardswap_map_hash: AtomicU64::new(0), hardswap: AtomicBool::new(hardswap), endpoint_quarantine: Arc::new(Mutex::new(HashMap::new())), kdf_material_fingerprint: Arc::new(RwLock::new(HashMap::new())), me_pool_drain_ttl_secs: AtomicU64::new(me_pool_drain_ttl_secs), me_pool_force_close_secs: AtomicU64::new(me_pool_force_close_secs), me_pool_min_fresh_ratio_permille: AtomicU32::new(Self::ratio_to_permille( me_pool_min_fresh_ratio, )), me_hardswap_warmup_delay_min_ms: AtomicU64::new(me_hardswap_warmup_delay_min_ms), me_hardswap_warmup_delay_max_ms: AtomicU64::new(me_hardswap_warmup_delay_max_ms), me_hardswap_warmup_extra_passes: AtomicU32::new(me_hardswap_warmup_extra_passes as u32), me_hardswap_warmup_pass_backoff_base_ms: AtomicU64::new( me_hardswap_warmup_pass_backoff_base_ms, ), me_bind_stale_mode: AtomicU8::new(me_bind_stale_mode.as_u8()), me_bind_stale_ttl_secs: AtomicU64::new(me_bind_stale_ttl_secs), secret_atomic_snapshot: AtomicBool::new(me_secret_atomic_snapshot), me_deterministic_writer_sort: AtomicBool::new(me_deterministic_writer_sort), me_socks_kdf_policy: AtomicU8::new(me_socks_kdf_policy.as_u8()), me_route_no_writer_mode: AtomicU8::new(me_route_no_writer_mode.as_u8()), me_route_no_writer_wait: Duration::from_millis(me_route_no_writer_wait_ms), me_route_inline_recovery_attempts, me_route_inline_recovery_wait: Duration::from_millis(me_route_inline_recovery_wait_ms), runtime_ready: AtomicBool::new(false), }) } pub fn current_generation(&self) -> u64 { self.active_generation.load(Ordering::Relaxed) } pub fn set_runtime_ready(&self, ready: bool) { self.runtime_ready.store(ready, Ordering::Relaxed); } pub fn is_runtime_ready(&self) -> bool { self.runtime_ready.load(Ordering::Relaxed) } pub fn update_runtime_reinit_policy( &self, hardswap: bool, drain_ttl_secs: u64, force_close_secs: u64, min_fresh_ratio: f32, hardswap_warmup_delay_min_ms: u64, hardswap_warmup_delay_max_ms: u64, hardswap_warmup_extra_passes: u8, hardswap_warmup_pass_backoff_base_ms: u64, bind_stale_mode: MeBindStaleMode, bind_stale_ttl_secs: u64, secret_atomic_snapshot: bool, deterministic_writer_sort: bool, single_endpoint_shadow_writers: u8, single_endpoint_outage_mode_enabled: bool, single_endpoint_outage_disable_quarantine: bool, single_endpoint_outage_backoff_min_ms: u64, single_endpoint_outage_backoff_max_ms: u64, single_endpoint_shadow_rotate_every_secs: u64, floor_mode: MeFloorMode, adaptive_floor_idle_secs: u64, adaptive_floor_min_writers_single_endpoint: u8, adaptive_floor_recover_grace_secs: u64, ) { self.hardswap.store(hardswap, Ordering::Relaxed); self.me_pool_drain_ttl_secs .store(drain_ttl_secs, Ordering::Relaxed); self.me_pool_force_close_secs .store(force_close_secs, Ordering::Relaxed); self.me_pool_min_fresh_ratio_permille .store(Self::ratio_to_permille(min_fresh_ratio), Ordering::Relaxed); self.me_hardswap_warmup_delay_min_ms .store(hardswap_warmup_delay_min_ms, Ordering::Relaxed); self.me_hardswap_warmup_delay_max_ms .store(hardswap_warmup_delay_max_ms, Ordering::Relaxed); self.me_hardswap_warmup_extra_passes .store(hardswap_warmup_extra_passes as u32, Ordering::Relaxed); self.me_hardswap_warmup_pass_backoff_base_ms .store(hardswap_warmup_pass_backoff_base_ms, Ordering::Relaxed); self.me_bind_stale_mode .store(bind_stale_mode.as_u8(), Ordering::Relaxed); self.me_bind_stale_ttl_secs .store(bind_stale_ttl_secs, Ordering::Relaxed); self.secret_atomic_snapshot .store(secret_atomic_snapshot, Ordering::Relaxed); self.me_deterministic_writer_sort .store(deterministic_writer_sort, Ordering::Relaxed); self.me_single_endpoint_shadow_writers .store(single_endpoint_shadow_writers, Ordering::Relaxed); self.me_single_endpoint_outage_mode_enabled .store(single_endpoint_outage_mode_enabled, Ordering::Relaxed); self.me_single_endpoint_outage_disable_quarantine .store(single_endpoint_outage_disable_quarantine, Ordering::Relaxed); self.me_single_endpoint_outage_backoff_min_ms .store(single_endpoint_outage_backoff_min_ms, Ordering::Relaxed); self.me_single_endpoint_outage_backoff_max_ms .store(single_endpoint_outage_backoff_max_ms, Ordering::Relaxed); self.me_single_endpoint_shadow_rotate_every_secs .store(single_endpoint_shadow_rotate_every_secs, Ordering::Relaxed); let previous_floor_mode = self.floor_mode(); self.me_floor_mode .store(floor_mode.as_u8(), Ordering::Relaxed); self.me_adaptive_floor_idle_secs .store(adaptive_floor_idle_secs, Ordering::Relaxed); self.me_adaptive_floor_min_writers_single_endpoint .store(adaptive_floor_min_writers_single_endpoint, Ordering::Relaxed); self.me_adaptive_floor_recover_grace_secs .store(adaptive_floor_recover_grace_secs, Ordering::Relaxed); if previous_floor_mode != floor_mode { self.stats.increment_me_floor_mode_switch_total(); match (previous_floor_mode, floor_mode) { (MeFloorMode::Static, MeFloorMode::Adaptive) => { self.stats .increment_me_floor_mode_switch_static_to_adaptive_total(); } (MeFloorMode::Adaptive, MeFloorMode::Static) => { self.stats .increment_me_floor_mode_switch_adaptive_to_static_total(); } _ => {} } } } pub fn reset_stun_state(&self) { self.nat_probe_attempts.store(0, Ordering::Relaxed); self.nat_probe_disabled.store(false, Ordering::Relaxed); if let Ok(mut live) = self.nat_stun_live_servers.try_write() { live.clear(); } } pub fn translate_our_addr(&self, addr: SocketAddr) -> SocketAddr { let ip = self.translate_ip_for_nat(addr.ip()); SocketAddr::new(ip, addr.port()) } pub fn registry(&self) -> &Arc { &self.registry } pub fn update_runtime_transport_policy( &self, socks_kdf_policy: MeSocksKdfPolicy, route_backpressure_base_timeout_ms: u64, route_backpressure_high_timeout_ms: u64, route_backpressure_high_watermark_pct: u8, ) { self.me_socks_kdf_policy .store(socks_kdf_policy.as_u8(), Ordering::Relaxed); self.registry.update_route_backpressure_policy( route_backpressure_base_timeout_ms, route_backpressure_high_timeout_ms, route_backpressure_high_watermark_pct, ); } pub(super) fn socks_kdf_policy(&self) -> MeSocksKdfPolicy { MeSocksKdfPolicy::from_u8(self.me_socks_kdf_policy.load(Ordering::Relaxed)) } pub(super) fn writers_arc(&self) -> Arc>> { self.writers.clone() } pub(super) fn force_close_timeout(&self) -> Option { let secs = self.me_pool_force_close_secs.load(Ordering::Relaxed); if secs == 0 { None } else { Some(Duration::from_secs(secs)) } } pub(super) async fn key_selector(&self) -> u32 { self.proxy_secret.read().await.key_selector } pub(super) async fn secret_snapshot(&self) -> SecretSnapshot { self.proxy_secret.read().await.clone() } pub(super) fn bind_stale_mode(&self) -> MeBindStaleMode { MeBindStaleMode::from_u8(self.me_bind_stale_mode.load(Ordering::Relaxed)) } pub(super) fn required_writers_for_dc(&self, endpoint_count: usize) -> usize { if endpoint_count == 0 { return 0; } if endpoint_count == 1 { let shadow = self .me_single_endpoint_shadow_writers .load(Ordering::Relaxed) as usize; return (1 + shadow).max(3); } endpoint_count.max(3) } pub(super) fn floor_mode(&self) -> MeFloorMode { MeFloorMode::from_u8(self.me_floor_mode.load(Ordering::Relaxed)) } pub(super) fn adaptive_floor_idle_duration(&self) -> Duration { Duration::from_secs(self.me_adaptive_floor_idle_secs.load(Ordering::Relaxed)) } pub(super) fn adaptive_floor_recover_grace_duration(&self) -> Duration { Duration::from_secs( self.me_adaptive_floor_recover_grace_secs .load(Ordering::Relaxed), ) } pub(super) fn required_writers_for_dc_with_floor_mode( &self, endpoint_count: usize, reduce_for_idle: bool, ) -> usize { let base_required = self.required_writers_for_dc(endpoint_count); if !reduce_for_idle { return base_required; } if endpoint_count != 1 || self.floor_mode() != MeFloorMode::Adaptive { return base_required; } let min_writers = (self .me_adaptive_floor_min_writers_single_endpoint .load(Ordering::Relaxed) as usize) .max(1); base_required.min(min_writers) } pub(super) fn single_endpoint_outage_mode_enabled(&self) -> bool { self.me_single_endpoint_outage_mode_enabled .load(Ordering::Relaxed) } pub(super) fn single_endpoint_outage_disable_quarantine(&self) -> bool { self.me_single_endpoint_outage_disable_quarantine .load(Ordering::Relaxed) } pub(super) fn single_endpoint_outage_backoff_bounds_ms(&self) -> (u64, u64) { let min_ms = self .me_single_endpoint_outage_backoff_min_ms .load(Ordering::Relaxed); let max_ms = self .me_single_endpoint_outage_backoff_max_ms .load(Ordering::Relaxed); if min_ms <= max_ms { (min_ms, max_ms) } else { (max_ms, min_ms) } } pub(super) fn single_endpoint_shadow_rotate_interval(&self) -> Option { let secs = self .me_single_endpoint_shadow_rotate_every_secs .load(Ordering::Relaxed); if secs == 0 { None } else { Some(Duration::from_secs(secs)) } } pub(super) fn family_order(&self) -> Vec { let mut order = Vec::new(); if self.decision.prefer_ipv6() { if self.decision.ipv6_me { order.push(IpFamily::V6); } if self.decision.ipv4_me { order.push(IpFamily::V4); } } else { if self.decision.ipv4_me { order.push(IpFamily::V4); } if self.decision.ipv6_me { order.push(IpFamily::V6); } } order } pub(super) fn default_dc_for_routing(&self) -> i32 { let dc = self.default_dc.load(Ordering::Relaxed); if dc == 0 { 2 } else { dc } } pub(super) fn dc_lookup_chain_for_target(&self, target_dc: i32) -> Vec { let mut out = Vec::with_capacity(1); if target_dc != 0 { out.push(target_dc); } else { // Use default DC only when target DC is unknown and pinning is not established. let fallback_dc = self.default_dc_for_routing(); out.push(fallback_dc); } out } pub(super) async fn resolve_dc_for_endpoint(&self, addr: SocketAddr) -> i32 { let map_guard = if addr.is_ipv4() { self.proxy_map_v4.read().await } else { self.proxy_map_v6.read().await }; let mut matched_dc: Option = None; let mut ambiguous = false; for (dc, addrs) in map_guard.iter() { if addrs .iter() .any(|(ip, port)| SocketAddr::new(*ip, *port) == addr) { match matched_dc { None => matched_dc = Some(*dc), Some(prev_dc) if prev_dc == *dc => {} Some(_) => { ambiguous = true; break; } } } } drop(map_guard); if !ambiguous && let Some(dc) = matched_dc { return dc; } self.default_dc_for_routing() } pub(super) async fn proxy_map_for_family( &self, family: IpFamily, ) -> HashMap> { match family { IpFamily::V4 => self.proxy_map_v4.read().await.clone(), IpFamily::V6 => self.proxy_map_v6.read().await.clone(), } } }