New reroute algo + flush() optimized + new IPV6 Parser

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey
2026-02-18 19:08:27 +03:00
parent b84189b21b
commit df4494c37a
15 changed files with 1209 additions and 1058 deletions

View File

@@ -174,7 +174,11 @@ impl RpcWriter {
if buf.len() >= 16 {
self.iv.copy_from_slice(&buf[buf.len() - 16..]);
}
self.writer.write_all(&buf).await.map_err(ProxyError::Io)?;
self.writer.write_all(&buf).await.map_err(ProxyError::Io)
}
pub(crate) async fn send_and_flush(&mut self, payload: &[u8]) -> Result<()> {
self.send(payload).await?;
self.writer.flush().await.map_err(ProxyError::Io)
}
}

View File

@@ -3,7 +3,6 @@ use std::net::IpAddr;
use std::sync::Arc;
use std::time::Duration;
use regex::Regex;
use httpdate;
use tracing::{debug, info, warn};
@@ -20,6 +19,45 @@ pub struct ProxyConfigData {
pub default_dc: Option<i32>,
}
fn parse_host_port(s: &str) -> Option<(IpAddr, u16)> {
if let Some(bracket_end) = s.rfind(']') {
if s.starts_with('[') && bracket_end + 1 < s.len() && s.as_bytes().get(bracket_end + 1) == Some(&b':') {
let host = &s[1..bracket_end];
let port_str = &s[bracket_end + 2..];
let ip = host.parse::<IpAddr>().ok()?;
let port = port_str.parse::<u16>().ok()?;
return Some((ip, port));
}
}
let idx = s.rfind(':')?;
let host = &s[..idx];
let port_str = &s[idx + 1..];
let ip = host.parse::<IpAddr>().ok()?;
let port = port_str.parse::<u16>().ok()?;
Some((ip, port))
}
fn parse_proxy_line(line: &str) -> Option<(i32, IpAddr, u16)> {
// Accepts lines like:
// proxy_for 4 91.108.4.195:8888;
// proxy_for 2 [2001:67c:04e8:f002::d]:80;
// proxy_for 2 2001:67c:04e8:f002::d:80;
let trimmed = line.trim();
if !trimmed.starts_with("proxy_for") {
return None;
}
// Capture everything between dc and trailing ';'
let without_prefix = trimmed.trim_start_matches("proxy_for").trim();
let mut parts = without_prefix.split_whitespace();
let dc_str = parts.next()?;
let rest = parts.next()?;
let host_port = rest.trim_end_matches(';');
let dc = dc_str.parse::<i32>().ok()?;
let (ip, port) = parse_host_port(host_port)?;
Some((dc, ip, port))
}
pub async fn fetch_proxy_config(url: &str) -> Result<ProxyConfigData> {
let resp = reqwest::get(url)
.await
@@ -48,26 +86,26 @@ pub async fn fetch_proxy_config(url: &str) -> Result<ProxyConfigData> {
.await
.map_err(|e| crate::error::ProxyError::Proxy(format!("fetch_proxy_config read failed: {e}")))?;
let re_proxy = Regex::new(r"proxy_for\s+(-?\d+)\s+([^\s:]+):(\d+)\s*;").unwrap();
let re_default = Regex::new(r"default\s+(-?\d+)\s*;").unwrap();
let mut map: HashMap<i32, Vec<(IpAddr, u16)>> = HashMap::new();
for cap in re_proxy.captures_iter(&text) {
if let (Some(dc), Some(host), Some(port)) = (cap.get(1), cap.get(2), cap.get(3)) {
if let Ok(dc_idx) = dc.as_str().parse::<i32>() {
if let Ok(ip) = host.as_str().parse::<IpAddr>() {
if let Ok(port_num) = port.as_str().parse::<u16>() {
map.entry(dc_idx).or_default().push((ip, port_num));
}
}
}
for line in text.lines() {
if let Some((dc, ip, port)) = parse_proxy_line(line) {
map.entry(dc).or_default().push((ip, port));
}
}
let default_dc = re_default
.captures(&text)
.and_then(|c| c.get(1))
.and_then(|m| m.as_str().parse::<i32>().ok());
let default_dc = text
.lines()
.find_map(|l| {
let t = l.trim();
if let Some(rest) = t.strip_prefix("default") {
return rest
.trim()
.trim_end_matches(';')
.parse::<i32>()
.ok();
}
None
});
Ok(ProxyConfigData { map, default_dc })
}
@@ -111,3 +149,35 @@ pub async fn me_config_updater(pool: Arc<MePool>, rng: Arc<SecureRandom>, interv
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_ipv6_bracketed() {
let line = "proxy_for 2 [2001:67c:04e8:f002::d]:80;";
let res = parse_proxy_line(line).unwrap();
assert_eq!(res.0, 2);
assert_eq!(res.1, "2001:67c:04e8:f002::d".parse::<IpAddr>().unwrap());
assert_eq!(res.2, 80);
}
#[test]
fn parse_ipv6_plain() {
let line = "proxy_for 2 2001:67c:04e8:f002::d:80;";
let res = parse_proxy_line(line).unwrap();
assert_eq!(res.0, 2);
assert_eq!(res.1, "2001:67c:04e8:f002::d".parse::<IpAddr>().unwrap());
assert_eq!(res.2, 80);
}
#[test]
fn parse_ipv4() {
let line = "proxy_for 4 91.108.4.195:8888;";
let res = parse_proxy_line(line).unwrap();
assert_eq!(res.0, 4);
assert_eq!(res.1, "91.108.4.195".parse::<IpAddr>().unwrap());
assert_eq!(res.2, 8888);
}
}

View File

@@ -10,7 +10,7 @@ use std::os::raw::c_int;
use bytes::BytesMut;
use tokio::io::{AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf};
use tokio::net::TcpStream;
use tokio::net::{TcpStream, TcpSocket};
use tokio::time::timeout;
use tracing::{debug, info, warn};
@@ -44,7 +44,28 @@ impl MePool {
/// TCP connect with timeout + return RTT in milliseconds.
pub(crate) async fn connect_tcp(&self, addr: SocketAddr) -> Result<(TcpStream, f64)> {
let start = Instant::now();
let stream = timeout(Duration::from_secs(ME_CONNECT_TIMEOUT_SECS), TcpStream::connect(addr))
let connect_fut = async {
if addr.is_ipv6() {
if let Some(v6) = self.detected_ipv6 {
match TcpSocket::new_v6() {
Ok(sock) => {
if let Err(e) = sock.bind(SocketAddr::new(IpAddr::V6(v6), 0)) {
debug!(error = %e, bind_ip = %v6, "ME IPv6 bind failed, falling back to default bind");
} else {
match sock.connect(addr).await {
Ok(stream) => return Ok(stream),
Err(e) => debug!(error = %e, target = %addr, "ME IPv6 bound connect failed, retrying default connect"),
}
}
}
Err(e) => debug!(error = %e, "ME IPv6 socket creation failed, falling back to default connect"),
}
}
}
TcpStream::connect(addr).await
};
let stream = timeout(Duration::from_secs(ME_CONNECT_TIMEOUT_SECS), connect_fut)
.await
.map_err(|_| ProxyError::ConnectionTimeout { addr: addr.to_string() })??;
let connect_ms = start.elapsed().as_secs_f64() * 1000.0;

View File

@@ -1,5 +1,5 @@
use std::collections::HashMap;
use std::net::{IpAddr, SocketAddr};
use std::net::{IpAddr, Ipv6Addr, SocketAddr};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, Ordering};
use bytes::BytesMut;
@@ -32,6 +32,7 @@ pub struct MeWriter {
pub writer: Arc<Mutex<RpcWriter>>,
pub cancel: CancellationToken,
pub degraded: Arc<AtomicBool>,
pub draining: Arc<AtomicBool>,
}
pub struct MePool {
@@ -46,6 +47,7 @@ pub struct MePool {
pub(super) nat_ip_detected: Arc<RwLock<Option<IpAddr>>>,
pub(super) nat_probe: bool,
pub(super) nat_stun: Option<String>,
pub(super) detected_ipv6: Option<Ipv6Addr>,
pub(super) proxy_map_v4: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
pub(super) proxy_map_v6: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
pub(super) default_dc: AtomicI32,
@@ -69,6 +71,7 @@ impl MePool {
nat_ip: Option<IpAddr>,
nat_probe: bool,
nat_stun: Option<String>,
detected_ipv6: Option<Ipv6Addr>,
proxy_map_v4: HashMap<i32, Vec<(IpAddr, u16)>>,
proxy_map_v6: HashMap<i32, Vec<(IpAddr, u16)>>,
default_dc: Option<i32>,
@@ -87,6 +90,7 @@ impl MePool {
nat_ip_detected: Arc::new(RwLock::new(None)),
nat_probe,
nat_stun,
detected_ipv6,
pool_size: 2,
proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)),
proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)),
@@ -294,6 +298,7 @@ impl MePool {
let writer_id = self.next_writer_id.fetch_add(1, Ordering::Relaxed);
let cancel = CancellationToken::new();
let degraded = Arc::new(AtomicBool::new(false));
let draining = Arc::new(AtomicBool::new(false));
let rpc_w = Arc::new(Mutex::new(RpcWriter {
writer: hs.wr,
key: hs.write_key,
@@ -306,6 +311,7 @@ impl MePool {
writer: rpc_w.clone(),
cancel: cancel.clone(),
degraded: degraded.clone(),
draining: draining.clone(),
};
self.writers.write().await.push(writer.clone());
@@ -336,7 +342,7 @@ impl MePool {
)
.await;
if let Some(pool) = pool.upgrade() {
pool.remove_writer_and_reroute(writer_id).await;
pool.remove_writer_and_close_clients(writer_id).await;
}
if let Err(e) = res {
warn!(error = %e, "ME reader ended");
@@ -368,11 +374,11 @@ impl MePool {
tracker.insert(sent_id, (std::time::Instant::now(), writer_id));
}
ping_id = ping_id.wrapping_add(1);
if let Err(e) = rpc_w_ping.lock().await.send(&p).await {
if let Err(e) = rpc_w_ping.lock().await.send_and_flush(&p).await {
debug!(error = %e, "Active ME ping failed, removing dead writer");
cancel_ping.cancel();
if let Some(pool) = pool_ping.upgrade() {
pool.remove_writer_and_reroute(writer_id).await;
pool.remove_writer_and_close_clients(writer_id).await;
}
break;
}
@@ -405,12 +411,11 @@ impl MePool {
warn!(dc = %dc, "All ME servers for DC failed at init");
}
pub(crate) async fn remove_writer_and_reroute(&self, writer_id: u64) {
let mut queue = self.remove_writer_only(writer_id).await;
while let Some(bound) = queue.pop() {
if !self.reroute_conn(&bound, &mut queue).await {
let _ = self.registry.route(bound.conn_id, super::MeResponse::Close).await;
}
pub(crate) async fn remove_writer_and_close_clients(&self, writer_id: u64) {
let conns = self.remove_writer_only(writer_id).await;
for bound in conns {
let _ = self.registry.route(bound.conn_id, super::MeResponse::Close).await;
let _ = self.registry.unregister(bound.conn_id).await;
}
}
@@ -425,79 +430,28 @@ impl MePool {
self.registry.writer_lost(writer_id).await
}
async fn reroute_conn(&self, bound: &BoundConn, backlog: &mut Vec<BoundConn>) -> bool {
let payload = super::wire::build_proxy_req_payload(
bound.conn_id,
bound.meta.client_addr,
bound.meta.our_addr,
&[],
self.proxy_tag.as_deref(),
bound.meta.proto_flags,
);
let mut attempts = 0;
loop {
let writers_snapshot = {
let ws = self.writers.read().await;
if ws.is_empty() {
return false;
}
ws.clone()
};
let mut candidates = self.candidate_indices_for_dc(&writers_snapshot, bound.meta.target_dc).await;
if candidates.is_empty() {
return false;
}
candidates.sort_by_key(|idx| {
writers_snapshot[*idx]
.degraded
.load(Ordering::Relaxed)
.then_some(1usize)
.unwrap_or(0)
});
let start = self.rr.fetch_add(1, Ordering::Relaxed) as usize % candidates.len();
for offset in 0..candidates.len() {
let idx = candidates[(start + offset) % candidates.len()];
let w = &writers_snapshot[idx];
if let Ok(mut guard) = w.writer.try_lock() {
let send_res = guard.send(&payload).await;
drop(guard);
match send_res {
Ok(()) => {
self.registry
.bind_writer(bound.conn_id, w.id, w.writer.clone(), bound.meta.clone())
.await;
return true;
}
Err(e) => {
warn!(error = %e, writer_id = w.id, "ME reroute send failed");
backlog.extend(self.remove_writer_only(w.id).await);
}
}
continue;
}
}
let w = writers_snapshot[candidates[start]].clone();
match w.writer.lock().await.send(&payload).await {
Ok(()) => {
self.registry
.bind_writer(bound.conn_id, w.id, w.writer.clone(), bound.meta.clone())
.await;
return true;
}
Err(e) => {
warn!(error = %e, writer_id = w.id, "ME reroute send failed (blocking)");
backlog.extend(self.remove_writer_only(w.id).await);
}
}
attempts += 1;
if attempts > 3 {
return false;
pub(crate) async fn mark_writer_draining(self: &Arc<Self>, writer_id: u64) {
{
let mut ws = self.writers.write().await;
if let Some(w) = ws.iter_mut().find(|w| w.id == writer_id) {
w.draining.store(true, Ordering::Relaxed);
}
}
let pool = Arc::downgrade(self);
tokio::spawn(async move {
loop {
if let Some(p) = pool.upgrade() {
if p.registry.is_writer_empty(writer_id).await {
let _ = p.remove_writer_only(writer_id).await;
break;
}
tokio::time::sleep(Duration::from_secs(1)).await;
} else {
break;
}
}
});
}
}

View File

@@ -136,7 +136,7 @@ pub(crate) async fn reader_loop(
let mut pong = Vec::with_capacity(12);
pong.extend_from_slice(&RPC_PONG_U32.to_le_bytes());
pong.extend_from_slice(&ping_id.to_le_bytes());
if let Err(e) = writer.lock().await.send(&pong).await {
if let Err(e) = writer.lock().await.send_and_flush(&pong).await {
warn!(error = %e, "PONG send failed");
break;
}
@@ -176,7 +176,7 @@ async fn send_close_conn(writer: &Arc<Mutex<RpcWriter>>, conn_id: u64) {
p.extend_from_slice(&RPC_CLOSE_CONN_U32.to_le_bytes());
p.extend_from_slice(&conn_id.to_le_bytes());
if let Err(e) = writer.lock().await.send(&p).await {
if let Err(e) = writer.lock().await.send_and_flush(&p).await {
debug!(conn_id, error = %e, "Failed to send RPC_CLOSE_CONN");
}
}

View File

@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
@@ -28,12 +28,28 @@ pub struct ConnWriter {
pub writer: Arc<Mutex<RpcWriter>>,
}
struct RegistryInner {
map: HashMap<u64, mpsc::Sender<MeResponse>>,
writers: HashMap<u64, Arc<Mutex<RpcWriter>>>,
writer_for_conn: HashMap<u64, u64>,
conns_for_writer: HashMap<u64, HashSet<u64>>,
meta: HashMap<u64, ConnMeta>,
}
impl RegistryInner {
fn new() -> Self {
Self {
map: HashMap::new(),
writers: HashMap::new(),
writer_for_conn: HashMap::new(),
conns_for_writer: HashMap::new(),
meta: HashMap::new(),
}
}
}
pub struct ConnRegistry {
map: RwLock<HashMap<u64, mpsc::Sender<MeResponse>>>,
writers: RwLock<HashMap<u64, Arc<Mutex<RpcWriter>>>>,
writer_for_conn: RwLock<HashMap<u64, u64>>,
conns_for_writer: RwLock<HashMap<u64, Vec<u64>>>,
meta: RwLock<HashMap<u64, ConnMeta>>,
inner: RwLock<RegistryInner>,
next_id: AtomicU64,
}
@@ -41,11 +57,7 @@ impl ConnRegistry {
pub fn new() -> Self {
let start = rand::random::<u64>() | 1;
Self {
map: RwLock::new(HashMap::new()),
writers: RwLock::new(HashMap::new()),
writer_for_conn: RwLock::new(HashMap::new()),
conns_for_writer: RwLock::new(HashMap::new()),
meta: RwLock::new(HashMap::new()),
inner: RwLock::new(RegistryInner::new()),
next_id: AtomicU64::new(start),
}
}
@@ -53,23 +65,27 @@ impl ConnRegistry {
pub async fn register(&self) -> (u64, mpsc::Receiver<MeResponse>) {
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
let (tx, rx) = mpsc::channel(1024);
self.map.write().await.insert(id, tx);
self.inner.write().await.map.insert(id, tx);
(id, rx)
}
pub async fn unregister(&self, id: u64) {
self.map.write().await.remove(&id);
self.meta.write().await.remove(&id);
if let Some(writer_id) = self.writer_for_conn.write().await.remove(&id) {
if let Some(list) = self.conns_for_writer.write().await.get_mut(&writer_id) {
list.retain(|c| *c != id);
/// Unregister connection, returning associated writer_id if any.
pub async fn unregister(&self, id: u64) -> Option<u64> {
let mut inner = self.inner.write().await;
inner.map.remove(&id);
inner.meta.remove(&id);
if let Some(writer_id) = inner.writer_for_conn.remove(&id) {
if let Some(set) = inner.conns_for_writer.get_mut(&writer_id) {
set.remove(&id);
}
return Some(writer_id);
}
None
}
pub async fn route(&self, id: u64, resp: MeResponse) -> bool {
let m = self.map.read().await;
if let Some(tx) = m.get(&id) {
let inner = self.inner.read().await;
if let Some(tx) = inner.map.get(&id) {
tx.try_send(resp).is_ok()
} else {
false
@@ -83,40 +99,38 @@ impl ConnRegistry {
writer: Arc<Mutex<RpcWriter>>,
meta: ConnMeta,
) {
self.meta.write().await.entry(conn_id).or_insert(meta);
self.writer_for_conn.write().await.insert(conn_id, writer_id);
self.writers.write().await.entry(writer_id).or_insert_with(|| writer.clone());
self.conns_for_writer
.write()
.await
let mut inner = self.inner.write().await;
inner.meta.entry(conn_id).or_insert(meta);
inner.writer_for_conn.insert(conn_id, writer_id);
inner.writers.entry(writer_id).or_insert_with(|| writer.clone());
inner
.conns_for_writer
.entry(writer_id)
.or_insert_with(Vec::new)
.push(conn_id);
.or_insert_with(HashSet::new)
.insert(conn_id);
}
pub async fn get_writer(&self, conn_id: u64) -> Option<ConnWriter> {
let writer_id = {
let guard = self.writer_for_conn.read().await;
guard.get(&conn_id).cloned()
}?;
let writer = {
let guard = self.writers.read().await;
guard.get(&writer_id).cloned()
}?;
let inner = self.inner.read().await;
let writer_id = inner.writer_for_conn.get(&conn_id).cloned()?;
let writer = inner.writers.get(&writer_id).cloned()?;
Some(ConnWriter { writer_id, writer })
}
pub async fn writer_lost(&self, writer_id: u64) -> Vec<BoundConn> {
self.writers.write().await.remove(&writer_id);
let conns = self.conns_for_writer.write().await.remove(&writer_id).unwrap_or_default();
let mut inner = self.inner.write().await;
inner.writers.remove(&writer_id);
let conns = inner
.conns_for_writer
.remove(&writer_id)
.unwrap_or_default()
.into_iter()
.collect::<Vec<_>>();
let mut out = Vec::new();
let mut writer_for_conn = self.writer_for_conn.write().await;
let meta = self.meta.read().await;
for conn_id in conns {
writer_for_conn.remove(&conn_id);
if let Some(m) = meta.get(&conn_id) {
inner.writer_for_conn.remove(&conn_id);
if let Some(m) = inner.meta.get(&conn_id) {
out.push(BoundConn {
conn_id,
meta: m.clone(),
@@ -127,7 +141,16 @@ impl ConnRegistry {
}
pub async fn get_meta(&self, conn_id: u64) -> Option<ConnMeta> {
let guard = self.meta.read().await;
guard.get(&conn_id).cloned()
let inner = self.inner.read().await;
inner.meta.get(&conn_id).cloned()
}
pub async fn is_writer_empty(&self, writer_id: u64) -> bool {
let inner = self.inner.read().await;
inner
.conns_for_writer
.get(&writer_id)
.map(|s| s.is_empty())
.unwrap_or(true)
}
}

View File

@@ -31,8 +31,8 @@ pub async fn me_rotation_task(pool: Arc<MePool>, rng: Arc<SecureRandom>, interva
info!(addr = %w.addr, writer_id = w.id, "Rotating ME connection");
match pool.connect_one(w.addr, rng.as_ref()).await {
Ok(()) => {
// Remove old writer after new one is up.
pool.remove_writer_and_reroute(w.id).await;
// Mark old writer for graceful drain; removal happens when sessions finish.
pool.mark_writer_draining(w.id).await;
}
Err(e) => {
warn!(addr = %w.addr, writer_id = w.id, error = %e, "ME rotation connect failed");

View File

@@ -55,7 +55,7 @@ impl MePool {
Ok(()) => return Ok(()),
Err(e) => {
warn!(error = %e, writer_id = current.writer_id, "ME write failed");
self.remove_writer_and_reroute(current.writer_id).await;
self.remove_writer_and_close_clients(current.writer_id).await;
continue;
}
}
@@ -76,22 +76,29 @@ impl MePool {
return Err(ProxyError::Proxy("No ME writers available for target DC".into()));
}
emergency_attempts += 1;
let map = self.proxy_map_v4.read().await;
if let Some(addrs) = map.get(&(target_dc as i32)) {
let mut shuffled = addrs.clone();
shuffled.shuffle(&mut rand::rng());
drop(map);
for (ip, port) in shuffled {
let addr = SocketAddr::new(ip, port);
if self.connect_one(addr, self.rng.as_ref()).await.is_ok() {
break;
for family in self.family_order() {
let map_guard = match family {
IpFamily::V4 => self.proxy_map_v4.read().await,
IpFamily::V6 => self.proxy_map_v6.read().await,
};
if let Some(addrs) = map_guard.get(&(target_dc as i32)) {
let mut shuffled = addrs.clone();
shuffled.shuffle(&mut rand::rng());
drop(map_guard);
for (ip, port) in shuffled {
let addr = SocketAddr::new(ip, port);
if self.connect_one(addr, self.rng.as_ref()).await.is_ok() {
break;
}
}
tokio::time::sleep(Duration::from_millis(100 * emergency_attempts)).await;
let ws2 = self.writers.read().await;
writers_snapshot = ws2.clone();
drop(ws2);
candidate_indices = self.candidate_indices_for_dc(&writers_snapshot, target_dc).await;
break;
}
tokio::time::sleep(Duration::from_millis(100 * emergency_attempts)).await;
let ws2 = self.writers.read().await;
writers_snapshot = ws2.clone();
drop(ws2);
candidate_indices = self.candidate_indices_for_dc(&writers_snapshot, target_dc).await;
drop(map_guard);
}
if candidate_indices.is_empty() {
return Err(ProxyError::Proxy("No ME writers available for target DC".into()));
@@ -99,11 +106,10 @@ impl MePool {
}
candidate_indices.sort_by_key(|idx| {
writers_snapshot[*idx]
.degraded
.load(Ordering::Relaxed)
.then_some(1usize)
.unwrap_or(0)
let w = &writers_snapshot[*idx];
let degraded = w.degraded.load(Ordering::Relaxed);
let draining = w.draining.load(Ordering::Relaxed);
(draining as usize, degraded as usize)
});
let start = self.rr.fetch_add(1, Ordering::Relaxed) as usize % candidate_indices.len();
@@ -111,6 +117,9 @@ impl MePool {
for offset in 0..candidate_indices.len() {
let idx = candidate_indices[(start + offset) % candidate_indices.len()];
let w = &writers_snapshot[idx];
if w.draining.load(Ordering::Relaxed) {
continue;
}
if let Ok(mut guard) = w.writer.try_lock() {
let send_res = guard.send(&payload).await;
drop(guard);
@@ -123,7 +132,7 @@ impl MePool {
}
Err(e) => {
warn!(error = %e, writer_id = w.id, "ME write failed");
self.remove_writer_and_reroute(w.id).await;
self.remove_writer_and_close_clients(w.id).await;
continue;
}
}
@@ -131,6 +140,9 @@ impl MePool {
}
let w = writers_snapshot[candidate_indices[start]].clone();
if w.draining.load(Ordering::Relaxed) {
continue;
}
match w.writer.lock().await.send(&payload).await {
Ok(()) => {
self.registry
@@ -140,7 +152,7 @@ impl MePool {
}
Err(e) => {
warn!(error = %e, writer_id = w.id, "ME write failed (blocking)");
self.remove_writer_and_reroute(w.id).await;
self.remove_writer_and_close_clients(w.id).await;
}
}
}
@@ -151,9 +163,9 @@ impl MePool {
let mut p = Vec::with_capacity(12);
p.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes());
p.extend_from_slice(&conn_id.to_le_bytes());
if let Err(e) = w.writer.lock().await.send(&p).await {
if let Err(e) = w.writer.lock().await.send_and_flush(&p).await {
debug!(error = %e, "ME close write failed");
self.remove_writer_and_reroute(w.writer_id).await;
self.remove_writer_and_close_clients(w.writer_id).await;
}
} else {
debug!(conn_id, "ME close skipped (writer missing)");
@@ -213,17 +225,24 @@ impl MePool {
}
if preferred.is_empty() {
return (0..writers.len()).collect();
return (0..writers.len())
.filter(|i| !writers[*i].draining.load(Ordering::Relaxed))
.collect();
}
let mut out = Vec::new();
for (idx, w) in writers.iter().enumerate() {
if w.draining.load(Ordering::Relaxed) {
continue;
}
if preferred.iter().any(|p| *p == w.addr) {
out.push(idx);
}
}
if out.is_empty() {
return (0..writers.len()).collect();
return (0..writers.len())
.filter(|i| !writers[*i].draining.load(Ordering::Relaxed))
.collect();
}
out
}