diff --git a/src/transport/middle_proxy/pool_refill.rs b/src/transport/middle_proxy/pool_refill.rs index 544d048..fc916f4 100644 --- a/src/transport/middle_proxy/pool_refill.rs +++ b/src/transport/middle_proxy/pool_refill.rs @@ -1,4 +1,4 @@ -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::net::SocketAddr; use std::sync::Arc; use std::sync::atomic::Ordering; @@ -113,10 +113,35 @@ impl MePool { contour: WriterContour, allow_coverage_override: bool, ) -> bool { - let candidates = self.connectable_endpoints(endpoints).await; + let mut candidates = self.connectable_endpoints(endpoints).await; if candidates.is_empty() { return false; } + if candidates.len() > 1 { + let mut active_by_endpoint = HashMap::::new(); + let ws = self.writers.read().await; + for writer in ws.iter() { + if writer.draining.load(Ordering::Relaxed) { + continue; + } + if writer.writer_dc != dc { + continue; + } + if !matches!( + super::pool::WriterContour::from_u8( + writer.contour.load(Ordering::Relaxed), + ), + super::pool::WriterContour::Active + ) { + continue; + } + if candidates.contains(&writer.addr) { + *active_by_endpoint.entry(writer.addr).or_insert(0) += 1; + } + } + drop(ws); + candidates.sort_by_key(|addr| (active_by_endpoint.get(addr).copied().unwrap_or(0), *addr)); + } let start = (self.rr.fetch_add(1, Ordering::Relaxed) as usize) % candidates.len(); for offset in 0..candidates.len() { let idx = (start + offset) % candidates.len();