From 50e9e5cf32721c79424d449853f71589ad0f52a2 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Sat, 18 Apr 2026 00:34:35 +0300 Subject: [PATCH] Active Ring and DRR Hardening Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- .../middle_proxy/fairness/scheduler.rs | 450 +++++++++++++----- .../tests/fairness_security_tests.rs | 6 + 2 files changed, 339 insertions(+), 117 deletions(-) diff --git a/src/transport/middle_proxy/fairness/scheduler.rs b/src/transport/middle_proxy/fairness/scheduler.rs index 8da3636..4adb8e4 100644 --- a/src/transport/middle_proxy/fairness/scheduler.rs +++ b/src/transport/middle_proxy/fairness/scheduler.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::time::{Duration, Instant}; use bytes::Bytes; @@ -86,6 +86,7 @@ pub(crate) struct WorkerFairnessState { pressure: PressureEvaluator, flows: HashMap, active_ring: VecDeque, + active_ring_members: HashSet, total_queued_bytes: u64, bucket_queued_bytes: Vec, bucket_active_flows: Vec, @@ -108,6 +109,7 @@ impl WorkerFairnessState { pressure: PressureEvaluator::new(now), flows: HashMap::new(), active_ring: VecDeque::new(), + active_ring_members: HashSet::new(), total_queued_bytes: 0, bucket_queued_bytes: vec![0; bucket_count], bucket_active_flows: vec![0; bucket_count], @@ -242,11 +244,24 @@ impl WorkerFairnessState { self.bucket_queued_bytes[bucket_id] = self.bucket_queued_bytes[bucket_id].saturating_add(frame_bytes); + let mut enqueue_active = false; if !entry.fairness.in_active_ring { entry.fairness.in_active_ring = true; - self.active_ring.push_back(conn_id); + enqueue_active = true; } + let pressure_state = self.pressure.state(); + let (before_membership, after_membership) = { + let before = Self::flow_membership(&entry.fairness); + Self::classify_flow(&self.config, pressure_state, now, &mut entry.fairness); + let after = Self::flow_membership(&entry.fairness); + (before, after) + }; + if enqueue_active { + self.enqueue_active_conn(conn_id); + } + self.apply_flow_membership_delta(before_membership, after_membership); + self.evaluate_pressure(now, true); AdmissionDecision::Admit } @@ -260,62 +275,92 @@ impl WorkerFairnessState { let Some(conn_id) = self.active_ring.pop_front() else { break; }; + if !self.active_ring_members.remove(&conn_id) { + continue; + } let mut candidate = None; let mut requeue_active = false; let mut drained_bytes = 0u64; let mut bucket_id = 0usize; + let mut should_continue = false; + let mut enqueue_active = false; + let mut membership_delta = None; let pressure_state = self.pressure.state(); if let Some(flow) = self.flows.get_mut(&conn_id) { bucket_id = flow.fairness.bucket_id; + flow.fairness.in_active_ring = false; + let before_membership = Self::flow_membership(&flow.fairness); if flow.queue.is_empty() { flow.fairness.in_active_ring = false; flow.fairness.scheduler_state = FlowSchedulerState::Idle; flow.fairness.pending_bytes = 0; + flow.fairness.deficit_bytes = 0; flow.fairness.queue_started_at = None; - continue; - } + should_continue = true; + } else { + Self::classify_flow(&self.config, pressure_state, now, &mut flow.fairness); - Self::classify_flow(&self.config, pressure_state, now, &mut flow.fairness); - - let quantum = - Self::effective_quantum_bytes(&self.config, pressure_state, &flow.fairness); - flow.fairness.deficit_bytes = flow - .fairness - .deficit_bytes - .saturating_add(i64::from(quantum)); - self.deficit_grants = self.deficit_grants.saturating_add(1); - - let front_len = flow.queue.front().map_or(0, |front| front.queued_bytes()); - if flow.fairness.deficit_bytes < front_len as i64 { - flow.fairness.consecutive_skips = - flow.fairness.consecutive_skips.saturating_add(1); - self.deficit_skips = self.deficit_skips.saturating_add(1); - requeue_active = true; - } else if let Some(frame) = flow.queue.pop_front() { - drained_bytes = frame.queued_bytes(); - flow.fairness.pending_bytes = - flow.fairness.pending_bytes.saturating_sub(drained_bytes); + let quantum = Self::effective_quantum_bytes( + &self.config, + pressure_state, + &flow.fairness, + ); flow.fairness.deficit_bytes = flow .fairness .deficit_bytes - .saturating_sub(drained_bytes as i64); - flow.fairness.consecutive_skips = 0; - flow.fairness.queue_started_at = - flow.queue.front().map(|front| front.enqueued_at); - requeue_active = !flow.queue.is_empty(); - if !requeue_active { - flow.fairness.scheduler_state = FlowSchedulerState::Idle; - flow.fairness.in_active_ring = false; + .saturating_add(i64::from(quantum)); + Self::clamp_deficit_bytes(&self.config, &mut flow.fairness); + self.deficit_grants = self.deficit_grants.saturating_add(1); + + let front_len = flow.queue.front().map_or(0, |front| front.queued_bytes()); + if flow.fairness.deficit_bytes < front_len as i64 { + flow.fairness.consecutive_skips = + flow.fairness.consecutive_skips.saturating_add(1); + self.deficit_skips = self.deficit_skips.saturating_add(1); + requeue_active = true; + flow.fairness.in_active_ring = true; + enqueue_active = true; + } else if let Some(frame) = flow.queue.pop_front() { + drained_bytes = frame.queued_bytes(); + flow.fairness.pending_bytes = + flow.fairness.pending_bytes.saturating_sub(drained_bytes); + flow.fairness.deficit_bytes = flow + .fairness + .deficit_bytes + .saturating_sub(drained_bytes as i64); + Self::clamp_deficit_bytes(&self.config, &mut flow.fairness); + flow.fairness.consecutive_skips = 0; + flow.fairness.queue_started_at = + flow.queue.front().map(|front| front.enqueued_at); + requeue_active = !flow.queue.is_empty(); + if !requeue_active { + flow.fairness.scheduler_state = FlowSchedulerState::Idle; + flow.fairness.in_active_ring = false; + flow.fairness.deficit_bytes = 0; + } else { + flow.fairness.in_active_ring = true; + enqueue_active = true; + } + candidate = Some(DispatchCandidate { + pressure_state, + flow_class: flow.fairness.pressure_class, + frame, + }); } - candidate = Some(DispatchCandidate { - pressure_state, - flow_class: flow.fairness.pressure_class, - frame, - }); } + + membership_delta = Some((before_membership, Self::flow_membership(&flow.fairness))); + } + + if let Some((before_membership, after_membership)) = membership_delta { + self.apply_flow_membership_delta(before_membership, after_membership); + } + + if should_continue { + continue; } if drained_bytes > 0 { @@ -324,11 +369,8 @@ impl WorkerFairnessState { self.bucket_queued_bytes[bucket_id].saturating_sub(drained_bytes); } - if requeue_active { - if let Some(flow) = self.flows.get_mut(&conn_id) { - flow.fairness.in_active_ring = true; - } - self.active_ring.push_back(conn_id); + if requeue_active && enqueue_active { + self.enqueue_active_conn(conn_id); } if let Some(candidate) = candidate { @@ -348,7 +390,9 @@ impl WorkerFairnessState { ) -> DispatchAction { match feedback { DispatchFeedback::Routed => { + let mut membership_delta = None; if let Some(flow) = self.flows.get_mut(&conn_id) { + let before_membership = Self::flow_membership(&flow.fairness); flow.fairness.last_drain_at = Some(now); flow.fairness.recent_drain_bytes = flow .fairness @@ -358,6 +402,17 @@ impl WorkerFairnessState { if flow.fairness.scheduler_state != FlowSchedulerState::Idle { flow.fairness.scheduler_state = FlowSchedulerState::Active; } + Self::classify_flow( + &self.config, + self.pressure.state(), + now, + &mut flow.fairness, + ); + membership_delta = + Some((before_membership, Self::flow_membership(&flow.fairness))); + } + if let Some((before_membership, after_membership)) = membership_delta { + self.apply_flow_membership_delta(before_membership, after_membership); } self.evaluate_pressure(now, false); DispatchAction::Continue @@ -365,47 +420,65 @@ impl WorkerFairnessState { DispatchFeedback::QueueFull => { self.pressure.note_route_stall(now, &self.config.pressure); self.downstream_stalls = self.downstream_stalls.saturating_add(1); + let state = self.pressure.state(); let Some(flow) = self.flows.get_mut(&conn_id) else { self.evaluate_pressure(now, true); return DispatchAction::Continue; }; + let (before_membership, after_membership, should_close_flow, enqueue_active) = { + let before_membership = Self::flow_membership(&flow.fairness); + let mut enqueue_active = false; - flow.fairness.consecutive_stalls = - flow.fairness.consecutive_stalls.saturating_add(1); - flow.fairness.scheduler_state = FlowSchedulerState::Backpressured; - flow.fairness.pressure_class = FlowPressureClass::Backpressured; + flow.fairness.consecutive_stalls = + flow.fairness.consecutive_stalls.saturating_add(1); + flow.fairness.scheduler_state = FlowSchedulerState::Backpressured; + flow.fairness.pressure_class = FlowPressureClass::Backpressured; - let state = self.pressure.state(); - let should_shed_frame = matches!(state, PressureState::Saturated) - || (matches!(state, PressureState::Shedding) - && flow.fairness.standing_state == StandingQueueState::Standing - && flow.fairness.consecutive_stalls - >= self.config.max_consecutive_stalls_before_shed); + let should_shed_frame = matches!(state, PressureState::Saturated) + || (matches!(state, PressureState::Shedding) + && flow.fairness.standing_state == StandingQueueState::Standing + && flow.fairness.consecutive_stalls + >= self.config.max_consecutive_stalls_before_shed); - if should_shed_frame { - self.shed_drops = self.shed_drops.saturating_add(1); - self.fairness_penalties = self.fairness_penalties.saturating_add(1); - } else { - let frame_bytes = candidate.frame.queued_bytes(); - flow.queue.push_front(candidate.frame); - flow.fairness.pending_bytes = - flow.fairness.pending_bytes.saturating_add(frame_bytes); - flow.fairness.queue_started_at = - flow.queue.front().map(|front| front.enqueued_at); - self.total_queued_bytes = self.total_queued_bytes.saturating_add(frame_bytes); - self.bucket_queued_bytes[flow.fairness.bucket_id] = self.bucket_queued_bytes - [flow.fairness.bucket_id] - .saturating_add(frame_bytes); - if !flow.fairness.in_active_ring { - flow.fairness.in_active_ring = true; - self.active_ring.push_back(conn_id); + if should_shed_frame { + self.shed_drops = self.shed_drops.saturating_add(1); + self.fairness_penalties = self.fairness_penalties.saturating_add(1); + } else { + let frame_bytes = candidate.frame.queued_bytes(); + flow.queue.push_front(candidate.frame); + flow.fairness.pending_bytes = + flow.fairness.pending_bytes.saturating_add(frame_bytes); + flow.fairness.queue_started_at = + flow.queue.front().map(|front| front.enqueued_at); + self.total_queued_bytes = + self.total_queued_bytes.saturating_add(frame_bytes); + self.bucket_queued_bytes[flow.fairness.bucket_id] = self + .bucket_queued_bytes[flow.fairness.bucket_id] + .saturating_add(frame_bytes); + if !flow.fairness.in_active_ring { + flow.fairness.in_active_ring = true; + enqueue_active = true; + } } - } - if flow.fairness.consecutive_stalls - >= self.config.max_consecutive_stalls_before_close - && self.pressure.state() == PressureState::Saturated - { + Self::classify_flow(&self.config, state, now, &mut flow.fairness); + let after_membership = Self::flow_membership(&flow.fairness); + let should_close_flow = flow.fairness.consecutive_stalls + >= self.config.max_consecutive_stalls_before_close + && self.pressure.state() == PressureState::Saturated; + ( + before_membership, + after_membership, + should_close_flow, + enqueue_active, + ) + }; + if enqueue_active { + self.enqueue_active_conn(conn_id); + } + self.apply_flow_membership_delta(before_membership, after_membership); + + if should_close_flow { self.remove_flow(conn_id); self.evaluate_pressure(now, true); return DispatchAction::CloseFlow; @@ -426,6 +499,15 @@ impl WorkerFairnessState { let Some(entry) = self.flows.remove(&conn_id) else { return; }; + self.active_ring_members.remove(&conn_id); + self.active_ring.retain(|queued_conn_id| *queued_conn_id != conn_id); + let (was_standing, was_backpressured) = Self::flow_membership(&entry.fairness); + if was_standing { + self.standing_flow_count = self.standing_flow_count.saturating_sub(1); + } + if was_backpressured { + self.backpressured_flow_count = self.backpressured_flow_count.saturating_sub(1); + } self.bucket_active_flows[entry.fairness.bucket_id] = self.bucket_active_flows[entry.fairness.bucket_id].saturating_sub(1); @@ -440,27 +522,6 @@ impl WorkerFairnessState { } fn evaluate_pressure(&mut self, now: Instant, force: bool) { - let mut standing = 0usize; - let mut backpressured = 0usize; - - for flow in self.flows.values_mut() { - Self::classify_flow(&self.config, self.pressure.state(), now, &mut flow.fairness); - if flow.fairness.standing_state == StandingQueueState::Standing { - standing = standing.saturating_add(1); - } - if matches!( - flow.fairness.scheduler_state, - FlowSchedulerState::Backpressured - | FlowSchedulerState::Penalized - | FlowSchedulerState::SheddingCandidate - ) { - backpressured = backpressured.saturating_add(1); - } - } - - self.standing_flow_count = standing; - self.backpressured_flow_count = backpressured; - let _ = self.pressure.maybe_evaluate( now, &self.config.pressure, @@ -468,8 +529,8 @@ impl WorkerFairnessState { PressureSignals { active_flows: self.flows.len(), total_queued_bytes: self.total_queued_bytes, - standing_flows: standing, - backpressured_flows: backpressured, + standing_flows: self.standing_flow_count, + backpressured_flows: self.backpressured_flow_count, }, force, ); @@ -481,12 +542,39 @@ impl WorkerFairnessState { now: Instant, fairness: &mut FlowFairnessState, ) { - if fairness.pending_bytes == 0 { - fairness.pressure_class = FlowPressureClass::Healthy; - fairness.standing_state = StandingQueueState::Transient; - fairness.scheduler_state = FlowSchedulerState::Idle; + let (pressure_class, standing_state, scheduler_state, standing) = + Self::derive_flow_classification(config, pressure_state, now, fairness); + fairness.pressure_class = pressure_class; + fairness.standing_state = standing_state; + fairness.scheduler_state = scheduler_state; + if scheduler_state == FlowSchedulerState::Idle { + fairness.deficit_bytes = 0; + } + if standing { + fairness.penalty_score = fairness.penalty_score.saturating_add(1); + } else { fairness.penalty_score = fairness.penalty_score.saturating_sub(1); - return; + } + } + + fn derive_flow_classification( + config: &WorkerFairnessConfig, + pressure_state: PressureState, + now: Instant, + fairness: &FlowFairnessState, + ) -> ( + FlowPressureClass, + StandingQueueState, + FlowSchedulerState, + bool, + ) { + if fairness.pending_bytes == 0 { + return ( + FlowPressureClass::Healthy, + StandingQueueState::Transient, + FlowSchedulerState::Idle, + false, + ); } let queue_age = fairness @@ -503,29 +591,157 @@ impl WorkerFairnessState { && (fairness.consecutive_stalls >= config.standing_stall_threshold || drain_stalled); if standing { - fairness.standing_state = StandingQueueState::Standing; - fairness.pressure_class = FlowPressureClass::Standing; - fairness.penalty_score = fairness.penalty_score.saturating_add(1); - fairness.scheduler_state = if pressure_state >= PressureState::Shedding { + let scheduler_state = if pressure_state >= PressureState::Shedding { FlowSchedulerState::SheddingCandidate } else { FlowSchedulerState::Penalized }; - return; + return ( + FlowPressureClass::Standing, + StandingQueueState::Standing, + scheduler_state, + true, + ); } - fairness.standing_state = StandingQueueState::Transient; if fairness.consecutive_stalls > 0 { - fairness.pressure_class = FlowPressureClass::Backpressured; - fairness.scheduler_state = FlowSchedulerState::Backpressured; - } else if fairness.pending_bytes >= config.standing_queue_min_backlog_bytes { - fairness.pressure_class = FlowPressureClass::Bursty; - fairness.scheduler_state = FlowSchedulerState::Active; - } else { - fairness.pressure_class = FlowPressureClass::Healthy; - fairness.scheduler_state = FlowSchedulerState::Active; + return ( + FlowPressureClass::Backpressured, + StandingQueueState::Transient, + FlowSchedulerState::Backpressured, + false, + ); } - fairness.penalty_score = fairness.penalty_score.saturating_sub(1); + + if fairness.pending_bytes >= config.standing_queue_min_backlog_bytes { + return ( + FlowPressureClass::Bursty, + StandingQueueState::Transient, + FlowSchedulerState::Active, + false, + ); + } + + ( + FlowPressureClass::Healthy, + StandingQueueState::Transient, + FlowSchedulerState::Active, + false, + ) + } + + #[inline] + fn flow_membership(fairness: &FlowFairnessState) -> (bool, bool) { + ( + fairness.standing_state == StandingQueueState::Standing, + Self::scheduler_state_is_backpressured(fairness.scheduler_state), + ) + } + + #[inline] + fn scheduler_state_is_backpressured(state: FlowSchedulerState) -> bool { + matches!( + state, + FlowSchedulerState::Backpressured + | FlowSchedulerState::Penalized + | FlowSchedulerState::SheddingCandidate + ) + } + + fn apply_flow_membership_delta( + &mut self, + before_membership: (bool, bool), + after_membership: (bool, bool), + ) { + if before_membership.0 != after_membership.0 { + if after_membership.0 { + self.standing_flow_count = self.standing_flow_count.saturating_add(1); + } else { + self.standing_flow_count = self.standing_flow_count.saturating_sub(1); + } + } + if before_membership.1 != after_membership.1 { + if after_membership.1 { + self.backpressured_flow_count = self.backpressured_flow_count.saturating_add(1); + } else { + self.backpressured_flow_count = self.backpressured_flow_count.saturating_sub(1); + } + } + } + + #[inline] + fn clamp_deficit_bytes(config: &WorkerFairnessConfig, fairness: &mut FlowFairnessState) { + let max_deficit = config.max_flow_queued_bytes.min(i64::MAX as u64) as i64; + fairness.deficit_bytes = fairness.deficit_bytes.clamp(0, max_deficit); + } + + #[inline] + fn enqueue_active_conn(&mut self, conn_id: u64) { + if self.active_ring_members.insert(conn_id) { + self.active_ring.push_back(conn_id); + } + } + + #[cfg(test)] + pub(crate) fn debug_recompute_flow_counters(&self, now: Instant) -> (usize, usize) { + let pressure_state = self.pressure.state(); + let mut standing = 0usize; + let mut backpressured = 0usize; + for flow in self.flows.values() { + let (_, standing_state, scheduler_state, _) = + Self::derive_flow_classification(&self.config, pressure_state, now, &flow.fairness); + if standing_state == StandingQueueState::Standing { + standing = standing.saturating_add(1); + } + if Self::scheduler_state_is_backpressured(scheduler_state) { + backpressured = backpressured.saturating_add(1); + } + } + (standing, backpressured) + } + + #[cfg(test)] + pub(crate) fn debug_check_active_ring_consistency(&self) -> bool { + if self.active_ring.len() != self.active_ring_members.len() { + return false; + } + + let mut seen = HashSet::with_capacity(self.active_ring.len()); + for conn_id in self.active_ring.iter().copied() { + if !seen.insert(conn_id) { + return false; + } + if !self.active_ring_members.contains(&conn_id) { + return false; + } + let Some(flow) = self.flows.get(&conn_id) else { + return false; + }; + if !flow.fairness.in_active_ring || flow.queue.is_empty() { + return false; + } + } + + for (conn_id, flow) in self.flows.iter() { + let in_ring = self.active_ring_members.contains(conn_id); + if flow.fairness.in_active_ring != in_ring { + return false; + } + if in_ring && flow.queue.is_empty() { + return false; + } + } + + true + } + + #[cfg(test)] + pub(crate) fn debug_max_deficit_bytes(&self) -> i64 { + self.flows + .values() + .map(|entry| entry.fairness.deficit_bytes) + .max() + .unwrap_or(0) } fn effective_quantum_bytes( diff --git a/src/transport/middle_proxy/tests/fairness_security_tests.rs b/src/transport/middle_proxy/tests/fairness_security_tests.rs index 41a8d86..058a574 100644 --- a/src/transport/middle_proxy/tests/fairness_security_tests.rs +++ b/src/transport/middle_proxy/tests/fairness_security_tests.rs @@ -180,6 +180,12 @@ fn fairness_randomized_sequence_preserves_memory_bounds() { } let snapshot = fairness.snapshot(); + let (standing_recomputed, backpressured_recomputed) = + fairness.debug_recompute_flow_counters(now); assert!(snapshot.total_queued_bytes <= 32 * 1024); + assert_eq!(snapshot.standing_flows, standing_recomputed); + assert_eq!(snapshot.backpressured_flows, backpressured_recomputed); + assert!(fairness.debug_check_active_ring_consistency()); + assert!(fairness.debug_max_deficit_bytes() <= 4 * 1024); } }