diff --git a/src/api/model.rs b/src/api/model.rs index 0bc52de..31233d7 100644 --- a/src/api/model.rs +++ b/src/api/model.rs @@ -236,6 +236,8 @@ pub(super) struct MeWritersSummary { pub(super) required_writers: usize, pub(super) alive_writers: usize, pub(super) coverage_pct: f64, + pub(super) fresh_alive_writers: usize, + pub(super) fresh_coverage_pct: f64, } #[derive(Serialize, Clone)] @@ -250,6 +252,12 @@ pub(super) struct MeWriterStatus { pub(super) bound_clients: usize, pub(super) idle_for_secs: Option, pub(super) rtt_ema_ms: Option, + pub(super) matches_active_generation: bool, + pub(super) in_desired_map: bool, + pub(super) allow_drain_fallback: bool, + pub(super) drain_started_at_epoch_secs: Option, + pub(super) drain_deadline_epoch_secs: Option, + pub(super) drain_over_ttl: bool, } #[derive(Serialize, Clone)] @@ -276,6 +284,8 @@ pub(super) struct DcStatus { pub(super) floor_capped: bool, pub(super) alive_writers: usize, pub(super) coverage_pct: f64, + pub(super) fresh_alive_writers: usize, + pub(super) fresh_coverage_pct: f64, pub(super) rtt_ms: Option, pub(super) load: usize, } diff --git a/src/api/runtime_stats.rs b/src/api/runtime_stats.rs index 139a4c5..9260c40 100644 --- a/src/api/runtime_stats.rs +++ b/src/api/runtime_stats.rs @@ -314,6 +314,8 @@ async fn get_minimal_payload_cached( required_writers: status.required_writers, alive_writers: status.alive_writers, coverage_pct: status.coverage_pct, + fresh_alive_writers: status.fresh_alive_writers, + fresh_coverage_pct: status.fresh_coverage_pct, }, writers: status .writers @@ -329,6 +331,12 @@ async fn get_minimal_payload_cached( bound_clients: entry.bound_clients, idle_for_secs: entry.idle_for_secs, rtt_ema_ms: entry.rtt_ema_ms, + matches_active_generation: entry.matches_active_generation, + in_desired_map: entry.in_desired_map, + allow_drain_fallback: entry.allow_drain_fallback, + drain_started_at_epoch_secs: entry.drain_started_at_epoch_secs, + drain_deadline_epoch_secs: entry.drain_deadline_epoch_secs, + drain_over_ttl: entry.drain_over_ttl, }) .collect(), }; @@ -363,6 +371,8 @@ async fn get_minimal_payload_cached( floor_capped: entry.floor_capped, alive_writers: entry.alive_writers, coverage_pct: entry.coverage_pct, + fresh_alive_writers: entry.fresh_alive_writers, + fresh_coverage_pct: entry.fresh_coverage_pct, rtt_ms: entry.rtt_ms, load: entry.load, }) @@ -486,6 +496,8 @@ fn disabled_me_writers(now_epoch_secs: u64, reason: &'static str) -> MeWritersDa required_writers: 0, alive_writers: 0, coverage_pct: 0.0, + fresh_alive_writers: 0, + fresh_coverage_pct: 0.0, }, writers: Vec::new(), } diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index b422dc6..3cc400f 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -62,6 +62,7 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c let mut adaptive_idle_since: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut adaptive_recover_until: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut floor_warn_next_allowed: HashMap<(i32, IpFamily), Instant> = HashMap::new(); + let mut drain_warn_next_allowed: HashMap = HashMap::new(); let mut degraded_interval = true; loop { let interval = if degraded_interval { @@ -71,7 +72,7 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c }; tokio::time::sleep(interval).await; pool.prune_closed_writers().await; - reap_draining_writers(&pool).await; + reap_draining_writers(&pool, &mut drain_warn_next_allowed).await; let v4_degraded = check_family( IpFamily::V4, &pool, @@ -110,17 +111,47 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c } } -async fn reap_draining_writers(pool: &Arc) { +async fn reap_draining_writers( + pool: &Arc, + warn_next_allowed: &mut HashMap, +) { let now_epoch_secs = MePool::now_epoch_secs(); + let now = Instant::now(); + let drain_ttl_secs = pool.me_pool_drain_ttl_secs.load(std::sync::atomic::Ordering::Relaxed); let writers = pool.writers.read().await.clone(); for writer in writers { if !writer.draining.load(std::sync::atomic::Ordering::Relaxed) { continue; } - if pool.registry.is_writer_empty(writer.id).await { + let is_empty = pool.registry.is_writer_empty(writer.id).await; + if is_empty { pool.remove_writer_and_close_clients(writer.id).await; continue; } + let drain_started_at_epoch_secs = writer + .draining_started_at_epoch_secs + .load(std::sync::atomic::Ordering::Relaxed); + if drain_ttl_secs > 0 + && drain_started_at_epoch_secs != 0 + && now_epoch_secs.saturating_sub(drain_started_at_epoch_secs) > drain_ttl_secs + && should_emit_writer_warn( + warn_next_allowed, + writer.id, + now, + pool.warn_rate_limit_duration(), + ) + { + warn!( + writer_id = writer.id, + writer_dc = writer.writer_dc, + endpoint = %writer.addr, + generation = writer.generation, + drain_ttl_secs, + force_close_secs = pool.me_pool_force_close_secs.load(std::sync::atomic::Ordering::Relaxed), + allow_drain_fallback = writer.allow_drain_fallback.load(std::sync::atomic::Ordering::Relaxed), + "ME draining writer remains non-empty past drain TTL" + ); + } let deadline_epoch_secs = writer .drain_deadline_epoch_secs .load(std::sync::atomic::Ordering::Relaxed); @@ -132,6 +163,23 @@ async fn reap_draining_writers(pool: &Arc) { } } +fn should_emit_writer_warn( + next_allowed: &mut HashMap, + writer_id: u64, + now: Instant, + cooldown: Duration, +) -> bool { + let Some(ready_at) = next_allowed.get(&writer_id).copied() else { + next_allowed.insert(writer_id, now + cooldown); + return true; + }; + if now >= ready_at { + next_allowed.insert(writer_id, now + cooldown); + return true; + } + false +} + async fn check_family( family: IpFamily, pool: &Arc, diff --git a/src/transport/middle_proxy/pool_status.rs b/src/transport/middle_proxy/pool_status.rs index 6673cf2..99070a8 100644 --- a/src/transport/middle_proxy/pool_status.rs +++ b/src/transport/middle_proxy/pool_status.rs @@ -19,6 +19,12 @@ pub(crate) struct MeApiWriterStatusSnapshot { pub bound_clients: usize, pub idle_for_secs: Option, pub rtt_ema_ms: Option, + pub matches_active_generation: bool, + pub in_desired_map: bool, + pub allow_drain_fallback: bool, + pub drain_started_at_epoch_secs: Option, + pub drain_deadline_epoch_secs: Option, + pub drain_over_ttl: bool, } #[derive(Clone, Debug)] @@ -35,6 +41,8 @@ pub(crate) struct MeApiDcStatusSnapshot { pub floor_capped: bool, pub alive_writers: usize, pub coverage_pct: f64, + pub fresh_alive_writers: usize, + pub fresh_coverage_pct: f64, pub rtt_ms: Option, pub load: usize, } @@ -55,6 +63,8 @@ pub(crate) struct MeApiStatusSnapshot { pub required_writers: usize, pub alive_writers: usize, pub coverage_pct: f64, + pub fresh_alive_writers: usize, + pub fresh_coverage_pct: f64, pub writers: Vec, pub dcs: Vec, } @@ -213,6 +223,8 @@ impl MePool { pub(crate) async fn api_status_snapshot(&self) -> MeApiStatusSnapshot { let now_epoch_secs = Self::now_epoch_secs(); + let active_generation = self.current_generation(); + let drain_ttl_secs = self.me_pool_drain_ttl_secs.load(Ordering::Relaxed); let mut endpoints_by_dc = BTreeMap::>::new(); if self.decision.ipv4_me { @@ -239,6 +251,7 @@ impl MePool { let mut live_writers_by_dc_endpoint = HashMap::<(i16, SocketAddr), usize>::new(); let mut live_writers_by_dc = HashMap::::new(); + let mut fresh_writers_by_dc = HashMap::::new(); let mut dc_rtt_agg = HashMap::::new(); let mut writer_rows = Vec::::with_capacity(writers.len()); @@ -247,6 +260,10 @@ impl MePool { let dc = i16::try_from(writer.writer_dc).ok(); let draining = writer.draining.load(Ordering::Relaxed); let degraded = writer.degraded.load(Ordering::Relaxed); + let matches_active_generation = writer.generation == active_generation; + let in_desired_map = dc + .and_then(|dc_idx| endpoints_by_dc.get(&dc_idx)) + .is_some_and(|endpoints| endpoints.contains(&endpoint)); let bound_clients = activity .bound_clients_by_writer .get(&writer.id) @@ -256,6 +273,21 @@ impl MePool { .get(&writer.id) .map(|idle_ts| now_epoch_secs.saturating_sub(*idle_ts)); let rtt_ema_ms = rtt.get(&writer.id).map(|(_, ema)| *ema); + let allow_drain_fallback = writer.allow_drain_fallback.load(Ordering::Relaxed); + let drain_started_at_epoch_secs = writer + .draining_started_at_epoch_secs + .load(Ordering::Relaxed); + let drain_deadline_epoch_secs = writer + .drain_deadline_epoch_secs + .load(Ordering::Relaxed); + let drain_started_at_epoch_secs = + (drain_started_at_epoch_secs != 0).then_some(drain_started_at_epoch_secs); + let drain_deadline_epoch_secs = + (drain_deadline_epoch_secs != 0).then_some(drain_deadline_epoch_secs); + let drain_over_ttl = draining + && drain_ttl_secs > 0 + && drain_started_at_epoch_secs + .is_some_and(|started| now_epoch_secs.saturating_sub(started) > drain_ttl_secs); let state = match WriterContour::from_u8(writer.contour.load(Ordering::Relaxed)) { WriterContour::Warm => "warm", WriterContour::Active => "active", @@ -273,6 +305,9 @@ impl MePool { entry.0 += ema_ms; entry.1 += 1; } + if matches_active_generation && in_desired_map { + *fresh_writers_by_dc.entry(dc_idx).or_insert(0) += 1; + } } } @@ -287,6 +322,12 @@ impl MePool { bound_clients, idle_for_secs, rtt_ema_ms, + matches_active_generation, + in_desired_map, + allow_drain_fallback, + drain_started_at_epoch_secs, + drain_deadline_epoch_secs, + drain_over_ttl, }); } @@ -295,6 +336,7 @@ impl MePool { let mut dcs = Vec::::with_capacity(endpoints_by_dc.len()); let mut available_endpoints = 0usize; let mut alive_writers = 0usize; + let mut fresh_alive_writers = 0usize; let floor_mode = self.floor_mode(); let adaptive_cpu_cores = (self .me_adaptive_floor_cpu_cores_effective @@ -333,6 +375,7 @@ impl MePool { let floor_capped = matches!(floor_mode, MeFloorMode::Adaptive) && dc_required_writers < base_required; let dc_alive_writers = live_writers_by_dc.get(&dc).copied().unwrap_or(0); + let dc_fresh_alive_writers = fresh_writers_by_dc.get(&dc).copied().unwrap_or(0); let dc_load = activity .active_sessions_by_target_dc .get(&dc) @@ -344,6 +387,7 @@ impl MePool { available_endpoints += dc_available_endpoints; alive_writers += dc_alive_writers; + fresh_alive_writers += dc_fresh_alive_writers; dcs.push(MeApiDcStatusSnapshot { dc, @@ -367,6 +411,8 @@ impl MePool { floor_capped, alive_writers: dc_alive_writers, coverage_pct: ratio_pct(dc_alive_writers, dc_required_writers), + fresh_alive_writers: dc_fresh_alive_writers, + fresh_coverage_pct: ratio_pct(dc_fresh_alive_writers, dc_required_writers), rtt_ms: dc_rtt_ms, load: dc_load, }); @@ -381,6 +427,8 @@ impl MePool { required_writers, alive_writers, coverage_pct: ratio_pct(alive_writers, required_writers), + fresh_alive_writers, + fresh_coverage_pct: ratio_pct(fresh_alive_writers, required_writers), writers: writer_rows, dcs, } diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index 64fb700..8ce3de3 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -178,6 +178,7 @@ impl MePool { allow_drain_fallback: allow_drain_fallback.clone(), }; self.writers.write().await.push(writer.clone()); + self.registry.register_writer(writer_id, tx.clone()).await; self.registry.mark_writer_idle(writer_id).await; self.conn_count.fetch_add(1, Ordering::Relaxed); self.writer_available.notify_one(); @@ -414,9 +415,15 @@ impl MePool { }; let (conn_id, mut service_rx) = pool.registry.register().await; - pool.registry - .bind_writer(conn_id, writer_id, tx_signal.clone(), meta.clone()) - .await; + if !pool + .registry + .bind_writer(conn_id, writer_id, meta.clone()) + .await + { + let _ = pool.registry.unregister(conn_id).await; + stats_signal.increment_me_rpc_proxy_req_signal_skipped_no_meta_total(); + continue; + } let payload = build_proxy_req_payload( conn_id, @@ -521,6 +528,12 @@ impl MePool { self.conn_count.fetch_sub(1, Ordering::Relaxed); } } + let conns = self.registry.writer_lost(writer_id).await; + { + let mut tracker = self.ping_tracker.lock().await; + tracker.retain(|_, (_, wid)| *wid != writer_id); + } + self.rtt_stats.lock().await.remove(&writer_id); if let Some(tx) = close_tx { let _ = tx.send(WriterCommand::Close).await; } @@ -533,8 +546,7 @@ impl MePool { } self.trigger_immediate_refill_for_dc(addr, writer_dc); } - self.rtt_stats.lock().await.remove(&writer_id); - self.registry.writer_lost(writer_id).await + conns } pub(crate) async fn mark_writer_draining_with_timeout( diff --git a/src/transport/middle_proxy/registry.rs b/src/transport/middle_proxy/registry.rs index ee04969..cc3028b 100644 --- a/src/transport/middle_proxy/registry.rs +++ b/src/transport/middle_proxy/registry.rs @@ -138,6 +138,15 @@ impl ConnRegistry { (id, rx) } + pub async fn register_writer(&self, writer_id: u64, tx: mpsc::Sender) { + let mut inner = self.inner.write().await; + inner.writers.insert(writer_id, tx); + inner + .conns_for_writer + .entry(writer_id) + .or_insert_with(HashSet::new); + } + /// Unregister connection, returning associated writer_id if any. pub async fn unregister(&self, id: u64) -> Option { let mut inner = self.inner.write().await; @@ -282,24 +291,39 @@ impl ConnRegistry { } } - pub async fn bind_writer( - &self, - conn_id: u64, - writer_id: u64, - tx: mpsc::Sender, - meta: ConnMeta, - ) { + pub async fn bind_writer(&self, conn_id: u64, writer_id: u64, meta: ConnMeta) -> bool { let mut inner = self.inner.write().await; - inner.meta.entry(conn_id).or_insert(meta.clone()); - inner.writer_for_conn.insert(conn_id, writer_id); + if !inner.writers.contains_key(&writer_id) { + return false; + } + + let previous_writer_id = inner.writer_for_conn.insert(conn_id, writer_id); + if let Some(previous_writer_id) = previous_writer_id + && previous_writer_id != writer_id + { + let became_empty = if let Some(set) = inner.conns_for_writer.get_mut(&previous_writer_id) + { + set.remove(&conn_id); + set.is_empty() + } else { + false + }; + if became_empty { + inner + .writer_idle_since_epoch_secs + .insert(previous_writer_id, Self::now_epoch_secs()); + } + } + + inner.meta.insert(conn_id, meta.clone()); inner.last_meta_for_writer.insert(writer_id, meta); inner.writer_idle_since_epoch_secs.remove(&writer_id); - inner.writers.entry(writer_id).or_insert_with(|| tx.clone()); inner .conns_for_writer .entry(writer_id) .or_insert_with(HashSet::new) .insert(conn_id); + true } pub async fn mark_writer_idle(&self, writer_id: u64) { @@ -384,6 +408,9 @@ impl ConnRegistry { let mut out = Vec::new(); for conn_id in conns { + if inner.writer_for_conn.get(&conn_id).copied() != Some(writer_id) { + continue; + } inner.writer_for_conn.remove(&conn_id); if let Some(m) = inner.meta.get(&conn_id) { out.push(BoundConn { @@ -427,47 +454,52 @@ mod tests { let (conn_c, _rx_c) = registry.register().await; let (writer_tx_a, _writer_rx_a) = tokio::sync::mpsc::channel(8); let (writer_tx_b, _writer_rx_b) = tokio::sync::mpsc::channel(8); + registry.register_writer(10, writer_tx_a.clone()).await; + registry.register_writer(20, writer_tx_b.clone()).await; let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443); - registry - .bind_writer( - conn_a, - 10, - writer_tx_a.clone(), - ConnMeta { - target_dc: 2, - client_addr: addr, - our_addr: addr, - proto_flags: 0, - }, - ) - .await; - registry - .bind_writer( - conn_b, - 10, - writer_tx_a, - ConnMeta { - target_dc: -2, - client_addr: addr, - our_addr: addr, - proto_flags: 0, - }, - ) - .await; - registry - .bind_writer( - conn_c, - 20, - writer_tx_b, - ConnMeta { - target_dc: 4, - client_addr: addr, - our_addr: addr, - proto_flags: 0, - }, - ) - .await; + assert!( + registry + .bind_writer( + conn_a, + 10, + ConnMeta { + target_dc: 2, + client_addr: addr, + our_addr: addr, + proto_flags: 0, + }, + ) + .await + ); + assert!( + registry + .bind_writer( + conn_b, + 10, + ConnMeta { + target_dc: -2, + client_addr: addr, + our_addr: addr, + proto_flags: 0, + }, + ) + .await + ); + assert!( + registry + .bind_writer( + conn_c, + 20, + ConnMeta { + target_dc: 4, + client_addr: addr, + our_addr: addr, + proto_flags: 0, + }, + ) + .await + ); let snapshot = registry.writer_activity_snapshot().await; assert_eq!(snapshot.bound_clients_by_writer.get(&10), Some(&2)); @@ -476,4 +508,130 @@ mod tests { assert_eq!(snapshot.active_sessions_by_target_dc.get(&-2), Some(&1)); assert_eq!(snapshot.active_sessions_by_target_dc.get(&4), Some(&1)); } + + #[tokio::test] + async fn bind_writer_rebinds_conn_atomically() { + let registry = ConnRegistry::new(); + let (conn_id, _rx) = registry.register().await; + let (writer_tx_a, _writer_rx_a) = tokio::sync::mpsc::channel(8); + let (writer_tx_b, _writer_rx_b) = tokio::sync::mpsc::channel(8); + registry.register_writer(10, writer_tx_a).await; + registry.register_writer(20, writer_tx_b).await; + + let client_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443); + let first_our_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 1, 1, 1)), 443); + let second_our_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(2, 2, 2, 2)), 443); + + assert!( + registry + .bind_writer( + conn_id, + 10, + ConnMeta { + target_dc: 2, + client_addr, + our_addr: first_our_addr, + proto_flags: 1, + }, + ) + .await + ); + assert!( + registry + .bind_writer( + conn_id, + 20, + ConnMeta { + target_dc: 2, + client_addr, + our_addr: second_our_addr, + proto_flags: 2, + }, + ) + .await + ); + + let writer = registry.get_writer(conn_id).await.expect("writer binding"); + assert_eq!(writer.writer_id, 20); + + let meta = registry.get_meta(conn_id).await.expect("conn meta"); + assert_eq!(meta.our_addr, second_our_addr); + assert_eq!(meta.proto_flags, 2); + + let snapshot = registry.writer_activity_snapshot().await; + assert_eq!(snapshot.bound_clients_by_writer.get(&10), Some(&0)); + assert_eq!(snapshot.bound_clients_by_writer.get(&20), Some(&1)); + assert!(registry.writer_idle_since_snapshot().await.contains_key(&10)); + } + + #[tokio::test] + async fn writer_lost_does_not_drop_rebound_conn() { + let registry = ConnRegistry::new(); + let (conn_id, _rx) = registry.register().await; + let (writer_tx_a, _writer_rx_a) = tokio::sync::mpsc::channel(8); + let (writer_tx_b, _writer_rx_b) = tokio::sync::mpsc::channel(8); + registry.register_writer(10, writer_tx_a).await; + registry.register_writer(20, writer_tx_b).await; + + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443); + assert!( + registry + .bind_writer( + conn_id, + 10, + ConnMeta { + target_dc: 2, + client_addr: addr, + our_addr: addr, + proto_flags: 0, + }, + ) + .await + ); + assert!( + registry + .bind_writer( + conn_id, + 20, + ConnMeta { + target_dc: 2, + client_addr: addr, + our_addr: addr, + proto_flags: 1, + }, + ) + .await + ); + + let lost = registry.writer_lost(10).await; + assert!(lost.is_empty()); + assert_eq!(registry.get_writer(conn_id).await.expect("writer").writer_id, 20); + + let removed_writer = registry.unregister(conn_id).await; + assert_eq!(removed_writer, Some(20)); + assert!(registry.is_writer_empty(20).await); + } + + #[tokio::test] + async fn bind_writer_rejects_unregistered_writer() { + let registry = ConnRegistry::new(); + let (conn_id, _rx) = registry.register().await; + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443); + + assert!( + !registry + .bind_writer( + conn_id, + 10, + ConnMeta { + target_dc: 2, + client_addr: addr, + our_addr: addr, + proto_flags: 0, + }, + ) + .await + ); + assert!(registry.get_writer(conn_id).await.is_none()); + } } diff --git a/src/transport/middle_proxy/send.rs b/src/transport/middle_proxy/send.rs index f63662b..0f9fed6 100644 --- a/src/transport/middle_proxy/send.rs +++ b/src/transport/middle_proxy/send.rs @@ -375,9 +375,14 @@ impl MePool { match w.tx.try_send(WriterCommand::Data(payload.clone())) { Ok(()) => { self.stats.increment_me_writer_pick_success_try_total(pick_mode); - self.registry - .bind_writer(conn_id, w.id, w.tx.clone(), meta) - .await; + if !self.registry.bind_writer(conn_id, w.id, meta).await { + debug!( + conn_id, + writer_id = w.id, + "ME writer disappeared before bind commit, retrying" + ); + continue; + } if w.generation < self.current_generation() { self.stats.increment_pool_stale_pick_total(); debug!( @@ -421,9 +426,14 @@ impl MePool { Ok(()) => { self.stats .increment_me_writer_pick_success_fallback_total(pick_mode); - self.registry - .bind_writer(conn_id, w.id, w.tx.clone(), meta) - .await; + if !self.registry.bind_writer(conn_id, w.id, meta).await { + debug!( + conn_id, + writer_id = w.id, + "ME writer disappeared before fallback bind commit, retrying" + ); + continue; + } if w.generation < self.current_generation() { self.stats.increment_pool_stale_pick_total(); }