Merge upstream/main into test/main-into-flow-sec

This commit is contained in:
David Osipov
2026-03-20 14:20:20 +04:00
21 changed files with 759 additions and 286 deletions

View File

@@ -19,7 +19,6 @@ use crate::protocol::constants::{RPC_CLOSE_EXT_U32, RPC_PING_U32};
use super::codec::{RpcWriter, WriterCommand};
use super::pool::{MePool, MeWriter, WriterContour};
use super::reader::reader_loop;
use super::registry::BoundConn;
use super::wire::build_proxy_req_payload;
const ME_ACTIVE_PING_SECS: u64 = 25;
@@ -27,6 +26,12 @@ const ME_ACTIVE_PING_JITTER_SECS: i64 = 5;
const ME_IDLE_KEEPALIVE_MAX_SECS: u64 = 5;
const ME_RPC_PROXY_REQ_RESPONSE_WAIT_MS: u64 = 700;
#[derive(Clone, Copy)]
enum WriterTeardownMode {
Any,
DrainingOnly,
}
fn is_me_peer_closed_error(error: &ProxyError) -> bool {
matches!(error, ProxyError::Io(ioe) if ioe.kind() == ErrorKind::UnexpectedEof)
}
@@ -42,9 +47,6 @@ impl MePool {
}
for writer_id in closed_writer_ids {
if self.remove_writer_if_empty(writer_id).await {
continue;
}
let _ = self.remove_writer_and_close_clients(writer_id).await;
}
}
@@ -141,6 +143,9 @@ impl MePool {
crc_mode: hs.crc_mode,
};
let cancel_wr = cancel.clone();
let cleanup_done = Arc::new(AtomicBool::new(false));
let cleanup_for_writer = cleanup_done.clone();
let pool_writer_task = Arc::downgrade(self);
tokio::spawn(async move {
loop {
tokio::select! {
@@ -158,6 +163,16 @@ impl MePool {
_ = cancel_wr.cancelled() => break,
}
}
if cleanup_for_writer
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
if let Some(pool) = pool_writer_task.upgrade() {
pool.remove_writer_and_close_clients(writer_id).await;
} else {
cancel_wr.cancel();
}
}
});
let writer = MeWriter {
id: writer_id,
@@ -194,7 +209,6 @@ impl MePool {
let cancel_ping = cancel.clone();
let tx_ping = tx.clone();
let ping_tracker_ping = ping_tracker.clone();
let cleanup_done = Arc::new(AtomicBool::new(false));
let cleanup_for_reader = cleanup_done.clone();
let cleanup_for_ping = cleanup_done.clone();
let keepalive_enabled = self.me_keepalive_enabled;
@@ -247,10 +261,9 @@ impl MePool {
if let Some(pool) = pool.upgrade() {
pool.remove_writer_and_close_clients(writer_id).await;
} else {
// Pool is already gone during shutdown; do a local writer list cleanup only.
let mut ws = writers_arc.write().await;
ws.retain(|w| w.id != writer_id);
debug!(writer_id, remaining = ws.len(), "Writer removed during pool shutdown");
// Fallback for shutdown races: make writer task exit quickly so stale
// channels are observable by periodic prune.
cancel_reader_token.cancel();
}
}
if let Err(e) = res {
@@ -258,6 +271,8 @@ impl MePool {
warn!(error = %e, "ME reader ended");
}
}
let remaining = writers_arc.read().await.len();
debug!(writer_id, remaining, "ME reader task finished");
});
let pool_ping = Arc::downgrade(self);
@@ -497,33 +512,51 @@ impl MePool {
}
pub(crate) async fn remove_writer_and_close_clients(self: &Arc<Self>, writer_id: u64) {
let conns = self.remove_writer_only(writer_id).await;
for bound in conns {
let _ = self.registry.route(bound.conn_id, super::MeResponse::Close).await;
let _ = self.registry.unregister(bound.conn_id).await;
}
// Full client cleanup now happens inside `registry.writer_lost` to keep
// writer reap/remove paths strictly non-blocking per connection.
let _ = self
.remove_writer_with_mode(writer_id, WriterTeardownMode::Any)
.await;
}
pub(crate) async fn remove_writer_if_empty(self: &Arc<Self>, writer_id: u64) -> bool {
if !self.registry.unregister_writer_if_empty(writer_id).await {
return false;
}
// The registry empty-check and unregister are atomic with respect to binds,
// so remove_writer_only cannot return active bound sessions here.
let _ = self.remove_writer_only(writer_id).await;
true
pub(super) async fn remove_draining_writer_hard_detach(
self: &Arc<Self>,
writer_id: u64,
) -> bool {
self.remove_writer_with_mode(writer_id, WriterTeardownMode::DrainingOnly)
.await
}
async fn remove_writer_only(self: &Arc<Self>, writer_id: u64) -> Vec<BoundConn> {
async fn remove_writer_only(self: &Arc<Self>, writer_id: u64) -> bool {
self.remove_writer_with_mode(writer_id, WriterTeardownMode::Any)
.await
}
// Authoritative teardown primitive shared by normal cleanup and watchdog path.
// Lock-order invariant:
// 1) mutate `writers` under pool write lock,
// 2) release pool lock,
// 3) run registry/metrics/refill side effects.
// `registry.writer_lost` must never run while `writers` lock is held.
async fn remove_writer_with_mode(
self: &Arc<Self>,
writer_id: u64,
mode: WriterTeardownMode,
) -> bool {
let mut close_tx: Option<mpsc::Sender<WriterCommand>> = None;
let mut removed_addr: Option<SocketAddr> = None;
let mut removed_dc: Option<i32> = None;
let mut removed_uptime: Option<Duration> = None;
let mut trigger_refill = false;
let mut removed = false;
{
let mut ws = self.writers.write().await;
if let Some(pos) = ws.iter().position(|w| w.id == writer_id) {
if matches!(mode, WriterTeardownMode::DrainingOnly)
&& !ws[pos].draining.load(Ordering::Relaxed)
{
return false;
}
let w = ws.remove(pos);
let was_draining = w.draining.load(Ordering::Relaxed);
if was_draining {
@@ -541,9 +574,15 @@ impl MePool {
}
close_tx = Some(w.tx.clone());
self.conn_count.fetch_sub(1, Ordering::Relaxed);
removed = true;
}
}
let conns = self.registry.writer_lost(writer_id).await;
// State invariant:
// - writer is removed from `self.writers` (pool visibility),
// - writer is removed from registry routing/binding maps via `writer_lost`.
// The close command below is only a best-effort accelerator for task shutdown.
// Cleanup progress must never depend on command-channel availability.
let _ = self.registry.writer_lost(writer_id).await;
{
let mut tracker = self.ping_tracker.lock().await;
tracker.retain(|_, (_, wid)| *wid != writer_id);
@@ -558,13 +597,17 @@ impl MePool {
// Quarantine flapping endpoints regardless of draining state.
self.maybe_quarantine_flapping_endpoint(addr, uptime).await;
}
if trigger_refill
&& let Some(addr) = removed_addr
&& let Some(writer_dc) = removed_dc
{
self.trigger_immediate_refill_for_dc(addr, writer_dc);
if let Some(addr) = removed_addr {
if let Some(uptime) = removed_uptime {
self.maybe_quarantine_flapping_endpoint(addr, uptime).await;
}
if trigger_refill
&& let Some(writer_dc) = removed_dc
{
self.trigger_immediate_refill_for_dc(addr, writer_dc);
}
}
conns
removed
}
pub(crate) async fn mark_writer_draining_with_timeout(