From ed4d1167dd6c33f0307e314c9b3c1807708b7215 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 20 Mar 2026 12:09:23 +0300 Subject: [PATCH] ME Writers Advanced Cleanup Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/transport/middle_proxy/health.rs | 92 ++++++++++++++++++++++- src/transport/middle_proxy/pool_refill.rs | 44 ++++++----- src/transport/middle_proxy/pool_writer.rs | 31 +++++--- 3 files changed, 133 insertions(+), 34 deletions(-) diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 8b62cff..5829de4 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -1327,6 +1327,33 @@ async fn recover_single_endpoint_outage( } let (min_backoff_ms, max_backoff_ms) = pool.single_endpoint_outage_backoff_bounds_ms(); + let bypass_quarantine = pool.single_endpoint_outage_disable_quarantine(); + if !bypass_quarantine { + let quarantine_remaining = { + let mut guard = pool.endpoint_quarantine.lock().await; + let quarantine_now = Instant::now(); + guard.retain(|_, expiry| *expiry > quarantine_now); + guard + .get(&endpoint) + .map(|expiry| expiry.saturating_duration_since(quarantine_now)) + }; + + if let Some(remaining) = quarantine_remaining + && !remaining.is_zero() + { + outage_next_attempt.insert(key, now + remaining); + debug!( + dc = %key.0, + family = ?key.1, + %endpoint, + required, + wait_ms = remaining.as_millis(), + "Single-endpoint outage reconnect deferred by endpoint quarantine" + ); + return; + } + } + if *reconnect_budget == 0 { outage_next_attempt.insert(key, now + Duration::from_millis(min_backoff_ms.max(250))); debug!( @@ -1342,7 +1369,6 @@ async fn recover_single_endpoint_outage( pool.stats .increment_me_single_endpoint_outage_reconnect_attempt_total(); - let bypass_quarantine = pool.single_endpoint_outage_disable_quarantine(); let attempt_ok = if bypass_quarantine { pool.stats .increment_me_single_endpoint_quarantine_bypass_total(); @@ -1561,9 +1587,10 @@ mod tests { use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; - use super::reap_draining_writers; + use super::{reap_draining_writers, recover_single_endpoint_outage}; use crate::config::{GeneralConfig, MeRouteNoWriterMode, MeSocksKdfPolicy, MeWriterPickMode}; use crate::crypto::SecureRandom; + use crate::network::IpFamily; use crate::network::probe::NetworkDecision; use crate::stats::Stats; use crate::transport::middle_proxy::codec::WriterCommand; @@ -1745,4 +1772,65 @@ mod tests { assert_eq!(pool.registry.get_writer(conn_b).await.unwrap().writer_id, 20); assert_eq!(pool.registry.get_writer(conn_c).await.unwrap().writer_id, 30); } + + #[tokio::test] + async fn removing_draining_writer_still_quarantines_flapping_endpoint() { + let pool = make_pool(1).await; + let now_epoch_secs = MePool::now_epoch_secs(); + let writer_id = 11u64; + let writer_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 4000 + writer_id as u16); + let conn_id = + insert_draining_writer(&pool, writer_id, now_epoch_secs.saturating_sub(5)).await; + + assert!(pool + .registry + .evict_bound_conn_if_writer(conn_id, writer_id) + .await); + pool.remove_writer_and_close_clients(writer_id).await; + + assert!(pool.is_endpoint_quarantined(writer_addr).await); + } + + #[tokio::test] + async fn single_endpoint_outage_respects_quarantine_when_bypass_disabled() { + let pool = make_pool(1).await; + pool.me_single_endpoint_outage_disable_quarantine + .store(false, Ordering::Relaxed); + + let key = (2, IpFamily::V4); + let endpoint = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 7443); + let quarantine_ttl = Duration::from_millis(200); + { + let mut guard = pool.endpoint_quarantine.lock().await; + guard.insert(endpoint, Instant::now() + quarantine_ttl); + } + + let rng = Arc::new(SecureRandom::new()); + let mut outage_backoff = HashMap::new(); + let mut outage_next_attempt = HashMap::new(); + let mut reconnect_budget = 1usize; + let started_at = Instant::now(); + + recover_single_endpoint_outage( + &pool, + &rng, + key, + endpoint, + 1, + &mut outage_backoff, + &mut outage_next_attempt, + &mut reconnect_budget, + ) + .await; + + assert_eq!(reconnect_budget, 1); + assert_eq!( + pool.stats + .get_me_single_endpoint_outage_reconnect_attempt_total(), + 0 + ); + assert_eq!(pool.stats.get_me_single_endpoint_quarantine_bypass_total(), 0); + let next_attempt = outage_next_attempt.get(&key).copied().unwrap(); + assert!(next_attempt >= started_at + Duration::from_millis(120)); + } } diff --git a/src/transport/middle_proxy/pool_refill.rs b/src/transport/middle_proxy/pool_refill.rs index 895bdb5..43e2e6b 100644 --- a/src/transport/middle_proxy/pool_refill.rs +++ b/src/transport/middle_proxy/pool_refill.rs @@ -49,28 +49,31 @@ impl MePool { return Vec::new(); } - let mut guard = self.endpoint_quarantine.lock().await; - let now = Instant::now(); - guard.retain(|_, expiry| *expiry > now); + loop { + let mut guard = self.endpoint_quarantine.lock().await; + let now = Instant::now(); + guard.retain(|_, expiry| *expiry > now); - let mut ready = Vec::::with_capacity(endpoints.len()); - let mut earliest_quarantine: Option<(SocketAddr, Instant)> = None; - for addr in endpoints { - if let Some(expiry) = guard.get(addr).copied() { - match earliest_quarantine { - Some((_, current_expiry)) if current_expiry <= expiry => {} - _ => earliest_quarantine = Some((*addr, expiry)), + let mut ready = Vec::::with_capacity(endpoints.len()); + let mut earliest_quarantine: Option<(SocketAddr, Instant)> = None; + for addr in endpoints { + if let Some(expiry) = guard.get(addr).copied() { + match earliest_quarantine { + Some((_, current_expiry)) if current_expiry <= expiry => {} + _ => earliest_quarantine = Some((*addr, expiry)), + } + } else { + ready.push(*addr); } - } else { - ready.push(*addr); } - } - if !ready.is_empty() { - return ready; - } + if !ready.is_empty() { + return ready; + } - if let Some((addr, expiry)) = earliest_quarantine { + let Some((addr, expiry)) = earliest_quarantine else { + return Vec::new(); + }; let remaining = expiry.saturating_duration_since(now); if remaining.is_zero() { return vec![addr]; @@ -81,13 +84,8 @@ impl MePool { wait_ms = remaining.as_millis(), "All ME endpoints quarantined; waiting for earliest to expire" ); - // After sleeping, the quarantine entry is expired but not removed yet. - // Callers that check is_endpoint_quarantined() will lazily clean it via retain(). tokio::time::sleep(remaining).await; - return vec![addr]; } - - Vec::new() } pub(super) async fn has_refill_inflight_for_dc_key(&self, key: RefillDcKey) -> bool { @@ -319,4 +317,4 @@ impl MePool { dc_guard.remove(&dc_key); }); } -} \ No newline at end of file +} diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index fb1ba10..e368ead 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -142,6 +142,9 @@ impl MePool { seq_no: 0, crc_mode: hs.crc_mode, }; + let cleanup_done = Arc::new(AtomicBool::new(false)); + let cleanup_for_writer = cleanup_done.clone(); + let pool_writer = Arc::downgrade(self); let cancel_wr = cancel.clone(); tokio::spawn(async move { loop { @@ -160,6 +163,17 @@ impl MePool { _ = cancel_wr.cancelled() => break, } } + cancel_wr.cancel(); + if cleanup_for_writer + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) + .is_ok() + { + if let Some(pool) = pool_writer.upgrade() { + pool.remove_writer_and_close_clients(writer_id).await; + } else { + debug!(writer_id, "ME writer cleanup skipped: pool dropped"); + } + } }); let writer = MeWriter { id: writer_id, @@ -196,7 +210,6 @@ impl MePool { let cancel_ping = cancel.clone(); let tx_ping = tx.clone(); let ping_tracker_ping = ping_tracker.clone(); - let cleanup_done = Arc::new(AtomicBool::new(false)); let cleanup_for_reader = cleanup_done.clone(); let cleanup_for_ping = cleanup_done.clone(); let keepalive_enabled = self.me_keepalive_enabled; @@ -242,6 +255,7 @@ impl MePool { stats_reader_close.increment_me_idle_close_by_peer_total(); info!(writer_id, "ME socket closed by peer on idle writer"); } + cancel_reader_token.cancel(); if cleanup_for_reader .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) .is_ok() @@ -249,13 +263,12 @@ impl MePool { if let Some(pool) = pool.upgrade() { pool.remove_writer_and_close_clients(writer_id).await; } else { - // Pool is gone (shutdown). Remove writer from Vec directly - // as a last resort — no registry/refill side effects needed - // during shutdown. conn_count is not decremented here because - // the pool (and its counters) are already dropped. - let mut ws = writers_arc.write().await; - ws.retain(|w| w.id != writer_id); - debug!(writer_id, remaining = ws.len(), "Writer removed during pool shutdown"); + let remaining = writers_arc.read().await.len(); + debug!( + writer_id, + remaining, + "ME reader cleanup skipped: pool dropped" + ); } } if let Err(e) = res { @@ -658,4 +671,4 @@ impl MePool { } } } -} \ No newline at end of file +}