ME Active-by-Endpoint

This commit is contained in:
Alexey 2026-03-08 03:04:27 +03:00
parent c08160600e
commit 6b3697ee87
No known key found for this signature in database
1 changed files with 27 additions and 2 deletions

View File

@ -1,4 +1,4 @@
use std::collections::HashSet; use std::collections::{HashMap, HashSet};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
@ -113,10 +113,35 @@ impl MePool {
contour: WriterContour, contour: WriterContour,
allow_coverage_override: bool, allow_coverage_override: bool,
) -> bool { ) -> bool {
let candidates = self.connectable_endpoints(endpoints).await; let mut candidates = self.connectable_endpoints(endpoints).await;
if candidates.is_empty() { if candidates.is_empty() {
return false; return false;
} }
if candidates.len() > 1 {
let mut active_by_endpoint = HashMap::<SocketAddr, usize>::new();
let ws = self.writers.read().await;
for writer in ws.iter() {
if writer.draining.load(Ordering::Relaxed) {
continue;
}
if writer.writer_dc != dc {
continue;
}
if !matches!(
super::pool::WriterContour::from_u8(
writer.contour.load(Ordering::Relaxed),
),
super::pool::WriterContour::Active
) {
continue;
}
if candidates.contains(&writer.addr) {
*active_by_endpoint.entry(writer.addr).or_insert(0) += 1;
}
}
drop(ws);
candidates.sort_by_key(|addr| (active_by_endpoint.get(addr).copied().unwrap_or(0), *addr));
}
let start = (self.rr.fetch_add(1, Ordering::Relaxed) as usize) % candidates.len(); let start = (self.rr.fetch_add(1, Ordering::Relaxed) as usize) % candidates.len();
for offset in 0..candidates.len() { for offset in 0..candidates.len() {
let idx = (start + offset) % candidates.len(); let idx = (start + offset) % candidates.len();