ME Writers Advanced Cleanup

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey 2026-03-20 12:09:23 +03:00
parent e40361b171
commit ed4d1167dd
No known key found for this signature in database
3 changed files with 133 additions and 34 deletions

View File

@ -1327,6 +1327,33 @@ async fn recover_single_endpoint_outage(
} }
let (min_backoff_ms, max_backoff_ms) = pool.single_endpoint_outage_backoff_bounds_ms(); let (min_backoff_ms, max_backoff_ms) = pool.single_endpoint_outage_backoff_bounds_ms();
let bypass_quarantine = pool.single_endpoint_outage_disable_quarantine();
if !bypass_quarantine {
let quarantine_remaining = {
let mut guard = pool.endpoint_quarantine.lock().await;
let quarantine_now = Instant::now();
guard.retain(|_, expiry| *expiry > quarantine_now);
guard
.get(&endpoint)
.map(|expiry| expiry.saturating_duration_since(quarantine_now))
};
if let Some(remaining) = quarantine_remaining
&& !remaining.is_zero()
{
outage_next_attempt.insert(key, now + remaining);
debug!(
dc = %key.0,
family = ?key.1,
%endpoint,
required,
wait_ms = remaining.as_millis(),
"Single-endpoint outage reconnect deferred by endpoint quarantine"
);
return;
}
}
if *reconnect_budget == 0 { if *reconnect_budget == 0 {
outage_next_attempt.insert(key, now + Duration::from_millis(min_backoff_ms.max(250))); outage_next_attempt.insert(key, now + Duration::from_millis(min_backoff_ms.max(250)));
debug!( debug!(
@ -1342,7 +1369,6 @@ async fn recover_single_endpoint_outage(
pool.stats pool.stats
.increment_me_single_endpoint_outage_reconnect_attempt_total(); .increment_me_single_endpoint_outage_reconnect_attempt_total();
let bypass_quarantine = pool.single_endpoint_outage_disable_quarantine();
let attempt_ok = if bypass_quarantine { let attempt_ok = if bypass_quarantine {
pool.stats pool.stats
.increment_me_single_endpoint_quarantine_bypass_total(); .increment_me_single_endpoint_quarantine_bypass_total();
@ -1561,9 +1587,10 @@ mod tests {
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use super::reap_draining_writers; use super::{reap_draining_writers, recover_single_endpoint_outage};
use crate::config::{GeneralConfig, MeRouteNoWriterMode, MeSocksKdfPolicy, MeWriterPickMode}; use crate::config::{GeneralConfig, MeRouteNoWriterMode, MeSocksKdfPolicy, MeWriterPickMode};
use crate::crypto::SecureRandom; use crate::crypto::SecureRandom;
use crate::network::IpFamily;
use crate::network::probe::NetworkDecision; use crate::network::probe::NetworkDecision;
use crate::stats::Stats; use crate::stats::Stats;
use crate::transport::middle_proxy::codec::WriterCommand; use crate::transport::middle_proxy::codec::WriterCommand;
@ -1745,4 +1772,65 @@ mod tests {
assert_eq!(pool.registry.get_writer(conn_b).await.unwrap().writer_id, 20); assert_eq!(pool.registry.get_writer(conn_b).await.unwrap().writer_id, 20);
assert_eq!(pool.registry.get_writer(conn_c).await.unwrap().writer_id, 30); assert_eq!(pool.registry.get_writer(conn_c).await.unwrap().writer_id, 30);
} }
#[tokio::test]
async fn removing_draining_writer_still_quarantines_flapping_endpoint() {
let pool = make_pool(1).await;
let now_epoch_secs = MePool::now_epoch_secs();
let writer_id = 11u64;
let writer_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 4000 + writer_id as u16);
let conn_id =
insert_draining_writer(&pool, writer_id, now_epoch_secs.saturating_sub(5)).await;
assert!(pool
.registry
.evict_bound_conn_if_writer(conn_id, writer_id)
.await);
pool.remove_writer_and_close_clients(writer_id).await;
assert!(pool.is_endpoint_quarantined(writer_addr).await);
}
#[tokio::test]
async fn single_endpoint_outage_respects_quarantine_when_bypass_disabled() {
let pool = make_pool(1).await;
pool.me_single_endpoint_outage_disable_quarantine
.store(false, Ordering::Relaxed);
let key = (2, IpFamily::V4);
let endpoint = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 7443);
let quarantine_ttl = Duration::from_millis(200);
{
let mut guard = pool.endpoint_quarantine.lock().await;
guard.insert(endpoint, Instant::now() + quarantine_ttl);
}
let rng = Arc::new(SecureRandom::new());
let mut outage_backoff = HashMap::new();
let mut outage_next_attempt = HashMap::new();
let mut reconnect_budget = 1usize;
let started_at = Instant::now();
recover_single_endpoint_outage(
&pool,
&rng,
key,
endpoint,
1,
&mut outage_backoff,
&mut outage_next_attempt,
&mut reconnect_budget,
)
.await;
assert_eq!(reconnect_budget, 1);
assert_eq!(
pool.stats
.get_me_single_endpoint_outage_reconnect_attempt_total(),
0
);
assert_eq!(pool.stats.get_me_single_endpoint_quarantine_bypass_total(), 0);
let next_attempt = outage_next_attempt.get(&key).copied().unwrap();
assert!(next_attempt >= started_at + Duration::from_millis(120));
}
} }

View File

@ -49,6 +49,7 @@ impl MePool {
return Vec::new(); return Vec::new();
} }
loop {
let mut guard = self.endpoint_quarantine.lock().await; let mut guard = self.endpoint_quarantine.lock().await;
let now = Instant::now(); let now = Instant::now();
guard.retain(|_, expiry| *expiry > now); guard.retain(|_, expiry| *expiry > now);
@ -70,7 +71,9 @@ impl MePool {
return ready; return ready;
} }
if let Some((addr, expiry)) = earliest_quarantine { let Some((addr, expiry)) = earliest_quarantine else {
return Vec::new();
};
let remaining = expiry.saturating_duration_since(now); let remaining = expiry.saturating_duration_since(now);
if remaining.is_zero() { if remaining.is_zero() {
return vec![addr]; return vec![addr];
@ -81,13 +84,8 @@ impl MePool {
wait_ms = remaining.as_millis(), wait_ms = remaining.as_millis(),
"All ME endpoints quarantined; waiting for earliest to expire" "All ME endpoints quarantined; waiting for earliest to expire"
); );
// After sleeping, the quarantine entry is expired but not removed yet.
// Callers that check is_endpoint_quarantined() will lazily clean it via retain().
tokio::time::sleep(remaining).await; tokio::time::sleep(remaining).await;
return vec![addr];
} }
Vec::new()
} }
pub(super) async fn has_refill_inflight_for_dc_key(&self, key: RefillDcKey) -> bool { pub(super) async fn has_refill_inflight_for_dc_key(&self, key: RefillDcKey) -> bool {

View File

@ -142,6 +142,9 @@ impl MePool {
seq_no: 0, seq_no: 0,
crc_mode: hs.crc_mode, crc_mode: hs.crc_mode,
}; };
let cleanup_done = Arc::new(AtomicBool::new(false));
let cleanup_for_writer = cleanup_done.clone();
let pool_writer = Arc::downgrade(self);
let cancel_wr = cancel.clone(); let cancel_wr = cancel.clone();
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
@ -160,6 +163,17 @@ impl MePool {
_ = cancel_wr.cancelled() => break, _ = cancel_wr.cancelled() => break,
} }
} }
cancel_wr.cancel();
if cleanup_for_writer
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
if let Some(pool) = pool_writer.upgrade() {
pool.remove_writer_and_close_clients(writer_id).await;
} else {
debug!(writer_id, "ME writer cleanup skipped: pool dropped");
}
}
}); });
let writer = MeWriter { let writer = MeWriter {
id: writer_id, id: writer_id,
@ -196,7 +210,6 @@ impl MePool {
let cancel_ping = cancel.clone(); let cancel_ping = cancel.clone();
let tx_ping = tx.clone(); let tx_ping = tx.clone();
let ping_tracker_ping = ping_tracker.clone(); let ping_tracker_ping = ping_tracker.clone();
let cleanup_done = Arc::new(AtomicBool::new(false));
let cleanup_for_reader = cleanup_done.clone(); let cleanup_for_reader = cleanup_done.clone();
let cleanup_for_ping = cleanup_done.clone(); let cleanup_for_ping = cleanup_done.clone();
let keepalive_enabled = self.me_keepalive_enabled; let keepalive_enabled = self.me_keepalive_enabled;
@ -242,6 +255,7 @@ impl MePool {
stats_reader_close.increment_me_idle_close_by_peer_total(); stats_reader_close.increment_me_idle_close_by_peer_total();
info!(writer_id, "ME socket closed by peer on idle writer"); info!(writer_id, "ME socket closed by peer on idle writer");
} }
cancel_reader_token.cancel();
if cleanup_for_reader if cleanup_for_reader
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok() .is_ok()
@ -249,13 +263,12 @@ impl MePool {
if let Some(pool) = pool.upgrade() { if let Some(pool) = pool.upgrade() {
pool.remove_writer_and_close_clients(writer_id).await; pool.remove_writer_and_close_clients(writer_id).await;
} else { } else {
// Pool is gone (shutdown). Remove writer from Vec directly let remaining = writers_arc.read().await.len();
// as a last resort — no registry/refill side effects needed debug!(
// during shutdown. conn_count is not decremented here because writer_id,
// the pool (and its counters) are already dropped. remaining,
let mut ws = writers_arc.write().await; "ME reader cleanup skipped: pool dropped"
ws.retain(|w| w.id != writer_id); );
debug!(writer_id, remaining = ws.len(), "Writer removed during pool shutdown");
} }
} }
if let Err(e) = res { if let Err(e) = res {