ME Pool V2

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey
2026-02-19 13:35:56 +03:00
parent 433e6c9a20
commit 35ae455e2b
13 changed files with 343 additions and 137 deletions

View File

@@ -10,6 +10,7 @@ use crate::network::IpFamily;
use crate::protocol::constants::RPC_CLOSE_EXT_U32;
use super::MePool;
use super::codec::WriterCommand;
use super::wire::build_proxy_req_payload;
use rand::seq::SliceRandom;
use super::registry::ConnMeta;
@@ -43,18 +44,15 @@ impl MePool {
loop {
if let Some(current) = self.registry.get_writer(conn_id).await {
let send_res = {
if let Ok(mut guard) = current.writer.try_lock() {
let r = guard.send(&payload).await;
drop(guard);
r
} else {
current.writer.lock().await.send(&payload).await
}
current
.tx
.send(WriterCommand::Data(payload.clone()))
.await
};
match send_res {
Ok(()) => return Ok(()),
Err(e) => {
warn!(error = %e, writer_id = current.writer_id, "ME write failed");
Err(_) => {
warn!(writer_id = current.writer_id, "ME writer channel closed");
self.remove_writer_and_close_clients(current.writer_id).await;
continue;
}
@@ -64,7 +62,26 @@ impl MePool {
let mut writers_snapshot = {
let ws = self.writers.read().await;
if ws.is_empty() {
return Err(ProxyError::Proxy("All ME connections dead".into()));
drop(ws);
for family in self.family_order() {
let map = match family {
IpFamily::V4 => self.proxy_map_v4.read().await.clone(),
IpFamily::V6 => self.proxy_map_v6.read().await.clone(),
};
for (_dc, addrs) in map.iter() {
for (ip, port) in addrs {
let addr = SocketAddr::new(*ip, *port);
if self.connect_one(addr, self.rng.as_ref()).await.is_ok() {
self.writer_available.notify_waiters();
break;
}
}
}
}
if tokio::time::timeout(Duration::from_secs(3), self.writer_available.notified()).await.is_err() {
return Err(ProxyError::Proxy("All ME connections dead (waited 3s)".into()));
}
continue;
}
ws.clone()
};
@@ -96,9 +113,10 @@ impl MePool {
writers_snapshot = ws2.clone();
drop(ws2);
candidate_indices = self.candidate_indices_for_dc(&writers_snapshot, target_dc).await;
break;
if !candidate_indices.is_empty() {
break;
}
}
drop(map_guard);
}
if candidate_indices.is_empty() {
return Err(ProxyError::Proxy("No ME writers available for target DC".into()));
@@ -120,22 +138,15 @@ impl MePool {
if w.draining.load(Ordering::Relaxed) {
continue;
}
if let Ok(mut guard) = w.writer.try_lock() {
let send_res = guard.send(&payload).await;
drop(guard);
match send_res {
Ok(()) => {
self.registry
.bind_writer(conn_id, w.id, w.writer.clone(), meta.clone())
.await;
return Ok(());
}
Err(e) => {
warn!(error = %e, writer_id = w.id, "ME write failed");
self.remove_writer_and_close_clients(w.id).await;
continue;
}
}
if w.tx.send(WriterCommand::Data(payload.clone())).await.is_ok() {
self.registry
.bind_writer(conn_id, w.id, w.tx.clone(), meta.clone())
.await;
return Ok(());
} else {
warn!(writer_id = w.id, "ME writer channel closed");
self.remove_writer_and_close_clients(w.id).await;
continue;
}
}
@@ -143,15 +154,15 @@ impl MePool {
if w.draining.load(Ordering::Relaxed) {
continue;
}
match w.writer.lock().await.send(&payload).await {
match w.tx.send(WriterCommand::Data(payload.clone())).await {
Ok(()) => {
self.registry
.bind_writer(conn_id, w.id, w.writer.clone(), meta.clone())
.bind_writer(conn_id, w.id, w.tx.clone(), meta.clone())
.await;
return Ok(());
}
Err(e) => {
warn!(error = %e, writer_id = w.id, "ME write failed (blocking)");
Err(_) => {
warn!(writer_id = w.id, "ME writer channel closed (blocking)");
self.remove_writer_and_close_clients(w.id).await;
}
}
@@ -163,8 +174,8 @@ impl MePool {
let mut p = Vec::with_capacity(12);
p.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes());
p.extend_from_slice(&conn_id.to_le_bytes());
if let Err(e) = w.writer.lock().await.send_and_flush(&p).await {
debug!(error = %e, "ME close write failed");
if w.tx.send(WriterCommand::DataAndFlush(p)).await.is_err() {
debug!("ME close write failed");
self.remove_writer_and_close_clients(w.writer_id).await;
}
} else {
@@ -176,7 +187,7 @@ impl MePool {
}
pub fn connection_count(&self) -> usize {
self.writers.try_read().map(|w| w.len()).unwrap_or(0)
self.conn_count.load(Ordering::Relaxed)
}
pub(super) async fn candidate_indices_for_dc(