diff --git a/src/transport/middle_proxy/fairness/model.rs b/src/transport/middle_proxy/fairness/model.rs index bdf4f9f..99a9330 100644 --- a/src/transport/middle_proxy/fairness/model.rs +++ b/src/transport/middle_proxy/fairness/model.rs @@ -77,11 +77,12 @@ pub(crate) struct FlowFairnessState { pub(crate) standing_state: StandingQueueState, pub(crate) scheduler_state: FlowSchedulerState, pub(crate) bucket_id: usize, + pub(crate) weight_quanta: u8, pub(crate) in_active_ring: bool, } impl FlowFairnessState { - pub(crate) fn new(flow_id: u64, worker_id: u16, bucket_id: usize) -> Self { + pub(crate) fn new(flow_id: u64, worker_id: u16, bucket_id: usize, weight_quanta: u8) -> Self { Self { _flow_id: flow_id, _worker_id: worker_id, @@ -97,6 +98,7 @@ impl FlowFairnessState { standing_state: StandingQueueState::Transient, scheduler_state: FlowSchedulerState::Idle, bucket_id, + weight_quanta: weight_quanta.max(1), in_active_ring: false, } } diff --git a/src/transport/middle_proxy/fairness/pressure.rs b/src/transport/middle_proxy/fairness/pressure.rs index 02a5942..de8d605 100644 --- a/src/transport/middle_proxy/fairness/pressure.rs +++ b/src/transport/middle_proxy/fairness/pressure.rs @@ -146,59 +146,55 @@ impl PressureEvaluator { ((signals.standing_flows.saturating_mul(100)) / signals.active_flows).min(100) as u8 }; - let mut pressure_score = 0u8; + let mut pressured = false; + let mut saturated = false; + let queue_saturated_pct = cfg.queue_ratio_shedding_pct.min(cfg.queue_ratio_saturated_pct); if queue_ratio_pct >= cfg.queue_ratio_pressured_pct { - pressure_score = pressure_score.max(1); + pressured = true; } - if queue_ratio_pct >= cfg.queue_ratio_shedding_pct { - pressure_score = pressure_score.max(2); - } - if queue_ratio_pct >= cfg.queue_ratio_saturated_pct { - pressure_score = pressure_score.max(3); + if queue_ratio_pct >= queue_saturated_pct { + saturated = true; } + let standing_saturated_pct = cfg + .standing_ratio_shedding_pct + .min(cfg.standing_ratio_saturated_pct); if standing_ratio_pct >= cfg.standing_ratio_pressured_pct { - pressure_score = pressure_score.max(1); + pressured = true; } - if standing_ratio_pct >= cfg.standing_ratio_shedding_pct { - pressure_score = pressure_score.max(2); - } - if standing_ratio_pct >= cfg.standing_ratio_saturated_pct { - pressure_score = pressure_score.max(3); + if standing_ratio_pct >= standing_saturated_pct { + saturated = true; } + let rejects_saturated = cfg.rejects_shedding.min(cfg.rejects_saturated); if self.admission_rejects_window >= cfg.rejects_pressured { - pressure_score = pressure_score.max(1); + pressured = true; } - if self.admission_rejects_window >= cfg.rejects_shedding { - pressure_score = pressure_score.max(2); - } - if self.admission_rejects_window >= cfg.rejects_saturated { - pressure_score = pressure_score.max(3); + if self.admission_rejects_window >= rejects_saturated { + saturated = true; } + let stalls_saturated = cfg.stalls_shedding.min(cfg.stalls_saturated); if self.route_stalls_window >= cfg.stalls_pressured { - pressure_score = pressure_score.max(1); + pressured = true; } - if self.route_stalls_window >= cfg.stalls_shedding { - pressure_score = pressure_score.max(2); - } - if self.route_stalls_window >= cfg.stalls_saturated { - pressure_score = pressure_score.max(3); + if self.route_stalls_window >= stalls_saturated { + saturated = true; } if signals.backpressured_flows > signals.active_flows.saturating_div(2) && signals.active_flows > 0 { - pressure_score = pressure_score.max(2); + pressured = true; } - match pressure_score { - 0 => PressureState::Normal, - 1 => PressureState::Pressured, - 2 => PressureState::Shedding, - _ => PressureState::Saturated, + if saturated { + PressureState::Saturated + } else if pressured { + PressureState::Pressured + } else { + PressureState::Normal } } diff --git a/src/transport/middle_proxy/fairness/scheduler.rs b/src/transport/middle_proxy/fairness/scheduler.rs index 8da3636..48b6790 100644 --- a/src/transport/middle_proxy/fairness/scheduler.rs +++ b/src/transport/middle_proxy/fairness/scheduler.rs @@ -1,7 +1,8 @@ -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::time::{Duration, Instant}; use bytes::Bytes; +use crate::protocol::constants::RPC_FLAG_QUICKACK; use super::model::{ AdmissionDecision, DispatchAction, DispatchCandidate, DispatchFeedback, FlowFairnessState, @@ -26,6 +27,8 @@ pub(crate) struct WorkerFairnessConfig { pub(crate) max_consecutive_stalls_before_close: u8, pub(crate) soft_bucket_count: usize, pub(crate) soft_bucket_share_pct: u8, + pub(crate) default_flow_weight: u8, + pub(crate) quickack_flow_weight: u8, pub(crate) pressure: PressureConfig, } @@ -46,6 +49,8 @@ impl Default for WorkerFairnessConfig { max_consecutive_stalls_before_close: 16, soft_bucket_count: 64, soft_bucket_share_pct: 25, + default_flow_weight: 1, + quickack_flow_weight: 4, pressure: PressureConfig::default(), } } @@ -57,9 +62,9 @@ struct FlowEntry { } impl FlowEntry { - fn new(flow_id: u64, worker_id: u16, bucket_id: usize) -> Self { + fn new(flow_id: u64, worker_id: u16, bucket_id: usize, weight_quanta: u8) -> Self { Self { - fairness: FlowFairnessState::new(flow_id, worker_id, bucket_id), + fairness: FlowFairnessState::new(flow_id, worker_id, bucket_id, weight_quanta), queue: VecDeque::new(), } } @@ -86,6 +91,7 @@ pub(crate) struct WorkerFairnessState { pressure: PressureEvaluator, flows: HashMap, active_ring: VecDeque, + active_ring_members: HashSet, total_queued_bytes: u64, bucket_queued_bytes: Vec, bucket_active_flows: Vec, @@ -108,6 +114,7 @@ impl WorkerFairnessState { pressure: PressureEvaluator::new(now), flows: HashMap::new(), active_ring: VecDeque::new(), + active_ring_members: HashSet::new(), total_queued_bytes: 0, bucket_queued_bytes: vec![0; bucket_count], bucket_active_flows: vec![0; bucket_count], @@ -184,6 +191,7 @@ impl WorkerFairnessState { } let bucket_id = self.bucket_for(conn_id); + let frame_weight = Self::weight_for_flags(&self.config, flags); let bucket_cap = self .config .max_total_queued_bytes @@ -205,12 +213,13 @@ impl WorkerFairnessState { self.bucket_active_flows[bucket_id].saturating_add(1); self.flows.insert( conn_id, - FlowEntry::new(conn_id, self.config.worker_id, bucket_id), + FlowEntry::new(conn_id, self.config.worker_id, bucket_id, frame_weight), ); self.flows .get_mut(&conn_id) .expect("flow inserted must be retrievable") }; + entry.fairness.weight_quanta = entry.fairness.weight_quanta.max(frame_weight); if entry.fairness.pending_bytes.saturating_add(frame_bytes) > self.config.max_flow_queued_bytes @@ -242,11 +251,24 @@ impl WorkerFairnessState { self.bucket_queued_bytes[bucket_id] = self.bucket_queued_bytes[bucket_id].saturating_add(frame_bytes); + let mut enqueue_active = false; if !entry.fairness.in_active_ring { entry.fairness.in_active_ring = true; - self.active_ring.push_back(conn_id); + enqueue_active = true; } + let pressure_state = self.pressure.state(); + let (before_membership, after_membership) = { + let before = Self::flow_membership(&entry.fairness); + Self::classify_flow(&self.config, pressure_state, now, &mut entry.fairness); + let after = Self::flow_membership(&entry.fairness); + (before, after) + }; + if enqueue_active { + self.enqueue_active_conn(conn_id); + } + self.apply_flow_membership_delta(before_membership, after_membership); + self.evaluate_pressure(now, true); AdmissionDecision::Admit } @@ -260,62 +282,92 @@ impl WorkerFairnessState { let Some(conn_id) = self.active_ring.pop_front() else { break; }; + if !self.active_ring_members.remove(&conn_id) { + continue; + } let mut candidate = None; let mut requeue_active = false; let mut drained_bytes = 0u64; let mut bucket_id = 0usize; + let mut should_continue = false; + let mut enqueue_active = false; + let mut membership_delta = None; let pressure_state = self.pressure.state(); if let Some(flow) = self.flows.get_mut(&conn_id) { bucket_id = flow.fairness.bucket_id; + flow.fairness.in_active_ring = false; + let before_membership = Self::flow_membership(&flow.fairness); if flow.queue.is_empty() { flow.fairness.in_active_ring = false; flow.fairness.scheduler_state = FlowSchedulerState::Idle; flow.fairness.pending_bytes = 0; + flow.fairness.deficit_bytes = 0; flow.fairness.queue_started_at = None; - continue; - } + should_continue = true; + } else { + Self::classify_flow(&self.config, pressure_state, now, &mut flow.fairness); - Self::classify_flow(&self.config, pressure_state, now, &mut flow.fairness); - - let quantum = - Self::effective_quantum_bytes(&self.config, pressure_state, &flow.fairness); - flow.fairness.deficit_bytes = flow - .fairness - .deficit_bytes - .saturating_add(i64::from(quantum)); - self.deficit_grants = self.deficit_grants.saturating_add(1); - - let front_len = flow.queue.front().map_or(0, |front| front.queued_bytes()); - if flow.fairness.deficit_bytes < front_len as i64 { - flow.fairness.consecutive_skips = - flow.fairness.consecutive_skips.saturating_add(1); - self.deficit_skips = self.deficit_skips.saturating_add(1); - requeue_active = true; - } else if let Some(frame) = flow.queue.pop_front() { - drained_bytes = frame.queued_bytes(); - flow.fairness.pending_bytes = - flow.fairness.pending_bytes.saturating_sub(drained_bytes); + let quantum = Self::effective_quantum_bytes( + &self.config, + pressure_state, + &flow.fairness, + ); flow.fairness.deficit_bytes = flow .fairness .deficit_bytes - .saturating_sub(drained_bytes as i64); - flow.fairness.consecutive_skips = 0; - flow.fairness.queue_started_at = - flow.queue.front().map(|front| front.enqueued_at); - requeue_active = !flow.queue.is_empty(); - if !requeue_active { - flow.fairness.scheduler_state = FlowSchedulerState::Idle; - flow.fairness.in_active_ring = false; + .saturating_add(i64::from(quantum)); + Self::clamp_deficit_bytes(&self.config, &mut flow.fairness); + self.deficit_grants = self.deficit_grants.saturating_add(1); + + let front_len = flow.queue.front().map_or(0, |front| front.queued_bytes()); + if flow.fairness.deficit_bytes < front_len as i64 { + flow.fairness.consecutive_skips = + flow.fairness.consecutive_skips.saturating_add(1); + self.deficit_skips = self.deficit_skips.saturating_add(1); + requeue_active = true; + flow.fairness.in_active_ring = true; + enqueue_active = true; + } else if let Some(frame) = flow.queue.pop_front() { + drained_bytes = frame.queued_bytes(); + flow.fairness.pending_bytes = + flow.fairness.pending_bytes.saturating_sub(drained_bytes); + flow.fairness.deficit_bytes = flow + .fairness + .deficit_bytes + .saturating_sub(drained_bytes as i64); + Self::clamp_deficit_bytes(&self.config, &mut flow.fairness); + flow.fairness.consecutive_skips = 0; + flow.fairness.queue_started_at = + flow.queue.front().map(|front| front.enqueued_at); + requeue_active = !flow.queue.is_empty(); + if !requeue_active { + flow.fairness.scheduler_state = FlowSchedulerState::Idle; + flow.fairness.in_active_ring = false; + flow.fairness.deficit_bytes = 0; + } else { + flow.fairness.in_active_ring = true; + enqueue_active = true; + } + candidate = Some(DispatchCandidate { + pressure_state, + flow_class: flow.fairness.pressure_class, + frame, + }); } - candidate = Some(DispatchCandidate { - pressure_state, - flow_class: flow.fairness.pressure_class, - frame, - }); } + + membership_delta = Some((before_membership, Self::flow_membership(&flow.fairness))); + } + + if let Some((before_membership, after_membership)) = membership_delta { + self.apply_flow_membership_delta(before_membership, after_membership); + } + + if should_continue { + continue; } if drained_bytes > 0 { @@ -324,11 +376,8 @@ impl WorkerFairnessState { self.bucket_queued_bytes[bucket_id].saturating_sub(drained_bytes); } - if requeue_active { - if let Some(flow) = self.flows.get_mut(&conn_id) { - flow.fairness.in_active_ring = true; - } - self.active_ring.push_back(conn_id); + if requeue_active && enqueue_active { + self.enqueue_active_conn(conn_id); } if let Some(candidate) = candidate { @@ -348,7 +397,9 @@ impl WorkerFairnessState { ) -> DispatchAction { match feedback { DispatchFeedback::Routed => { + let mut membership_delta = None; if let Some(flow) = self.flows.get_mut(&conn_id) { + let before_membership = Self::flow_membership(&flow.fairness); flow.fairness.last_drain_at = Some(now); flow.fairness.recent_drain_bytes = flow .fairness @@ -358,6 +409,17 @@ impl WorkerFairnessState { if flow.fairness.scheduler_state != FlowSchedulerState::Idle { flow.fairness.scheduler_state = FlowSchedulerState::Active; } + Self::classify_flow( + &self.config, + self.pressure.state(), + now, + &mut flow.fairness, + ); + membership_delta = + Some((before_membership, Self::flow_membership(&flow.fairness))); + } + if let Some((before_membership, after_membership)) = membership_delta { + self.apply_flow_membership_delta(before_membership, after_membership); } self.evaluate_pressure(now, false); DispatchAction::Continue @@ -365,47 +427,65 @@ impl WorkerFairnessState { DispatchFeedback::QueueFull => { self.pressure.note_route_stall(now, &self.config.pressure); self.downstream_stalls = self.downstream_stalls.saturating_add(1); + let state = self.pressure.state(); let Some(flow) = self.flows.get_mut(&conn_id) else { self.evaluate_pressure(now, true); return DispatchAction::Continue; }; + let (before_membership, after_membership, should_close_flow, enqueue_active) = { + let before_membership = Self::flow_membership(&flow.fairness); + let mut enqueue_active = false; - flow.fairness.consecutive_stalls = - flow.fairness.consecutive_stalls.saturating_add(1); - flow.fairness.scheduler_state = FlowSchedulerState::Backpressured; - flow.fairness.pressure_class = FlowPressureClass::Backpressured; + flow.fairness.consecutive_stalls = + flow.fairness.consecutive_stalls.saturating_add(1); + flow.fairness.scheduler_state = FlowSchedulerState::Backpressured; + flow.fairness.pressure_class = FlowPressureClass::Backpressured; - let state = self.pressure.state(); - let should_shed_frame = matches!(state, PressureState::Saturated) - || (matches!(state, PressureState::Shedding) - && flow.fairness.standing_state == StandingQueueState::Standing - && flow.fairness.consecutive_stalls - >= self.config.max_consecutive_stalls_before_shed); + let should_shed_frame = matches!(state, PressureState::Saturated) + || (matches!(state, PressureState::Shedding) + && flow.fairness.standing_state == StandingQueueState::Standing + && flow.fairness.consecutive_stalls + >= self.config.max_consecutive_stalls_before_shed); - if should_shed_frame { - self.shed_drops = self.shed_drops.saturating_add(1); - self.fairness_penalties = self.fairness_penalties.saturating_add(1); - } else { - let frame_bytes = candidate.frame.queued_bytes(); - flow.queue.push_front(candidate.frame); - flow.fairness.pending_bytes = - flow.fairness.pending_bytes.saturating_add(frame_bytes); - flow.fairness.queue_started_at = - flow.queue.front().map(|front| front.enqueued_at); - self.total_queued_bytes = self.total_queued_bytes.saturating_add(frame_bytes); - self.bucket_queued_bytes[flow.fairness.bucket_id] = self.bucket_queued_bytes - [flow.fairness.bucket_id] - .saturating_add(frame_bytes); - if !flow.fairness.in_active_ring { - flow.fairness.in_active_ring = true; - self.active_ring.push_back(conn_id); + if should_shed_frame { + self.shed_drops = self.shed_drops.saturating_add(1); + self.fairness_penalties = self.fairness_penalties.saturating_add(1); + } else { + let frame_bytes = candidate.frame.queued_bytes(); + flow.queue.push_front(candidate.frame); + flow.fairness.pending_bytes = + flow.fairness.pending_bytes.saturating_add(frame_bytes); + flow.fairness.queue_started_at = + flow.queue.front().map(|front| front.enqueued_at); + self.total_queued_bytes = + self.total_queued_bytes.saturating_add(frame_bytes); + self.bucket_queued_bytes[flow.fairness.bucket_id] = self + .bucket_queued_bytes[flow.fairness.bucket_id] + .saturating_add(frame_bytes); + if !flow.fairness.in_active_ring { + flow.fairness.in_active_ring = true; + enqueue_active = true; + } } - } - if flow.fairness.consecutive_stalls - >= self.config.max_consecutive_stalls_before_close - && self.pressure.state() == PressureState::Saturated - { + Self::classify_flow(&self.config, state, now, &mut flow.fairness); + let after_membership = Self::flow_membership(&flow.fairness); + let should_close_flow = flow.fairness.consecutive_stalls + >= self.config.max_consecutive_stalls_before_close + && self.pressure.state() == PressureState::Saturated; + ( + before_membership, + after_membership, + should_close_flow, + enqueue_active, + ) + }; + if enqueue_active { + self.enqueue_active_conn(conn_id); + } + self.apply_flow_membership_delta(before_membership, after_membership); + + if should_close_flow { self.remove_flow(conn_id); self.evaluate_pressure(now, true); return DispatchAction::CloseFlow; @@ -426,6 +506,15 @@ impl WorkerFairnessState { let Some(entry) = self.flows.remove(&conn_id) else { return; }; + self.active_ring_members.remove(&conn_id); + self.active_ring.retain(|queued_conn_id| *queued_conn_id != conn_id); + let (was_standing, was_backpressured) = Self::flow_membership(&entry.fairness); + if was_standing { + self.standing_flow_count = self.standing_flow_count.saturating_sub(1); + } + if was_backpressured { + self.backpressured_flow_count = self.backpressured_flow_count.saturating_sub(1); + } self.bucket_active_flows[entry.fairness.bucket_id] = self.bucket_active_flows[entry.fairness.bucket_id].saturating_sub(1); @@ -440,27 +529,6 @@ impl WorkerFairnessState { } fn evaluate_pressure(&mut self, now: Instant, force: bool) { - let mut standing = 0usize; - let mut backpressured = 0usize; - - for flow in self.flows.values_mut() { - Self::classify_flow(&self.config, self.pressure.state(), now, &mut flow.fairness); - if flow.fairness.standing_state == StandingQueueState::Standing { - standing = standing.saturating_add(1); - } - if matches!( - flow.fairness.scheduler_state, - FlowSchedulerState::Backpressured - | FlowSchedulerState::Penalized - | FlowSchedulerState::SheddingCandidate - ) { - backpressured = backpressured.saturating_add(1); - } - } - - self.standing_flow_count = standing; - self.backpressured_flow_count = backpressured; - let _ = self.pressure.maybe_evaluate( now, &self.config.pressure, @@ -468,8 +536,8 @@ impl WorkerFairnessState { PressureSignals { active_flows: self.flows.len(), total_queued_bytes: self.total_queued_bytes, - standing_flows: standing, - backpressured_flows: backpressured, + standing_flows: self.standing_flow_count, + backpressured_flows: self.backpressured_flow_count, }, force, ); @@ -481,12 +549,39 @@ impl WorkerFairnessState { now: Instant, fairness: &mut FlowFairnessState, ) { - if fairness.pending_bytes == 0 { - fairness.pressure_class = FlowPressureClass::Healthy; - fairness.standing_state = StandingQueueState::Transient; - fairness.scheduler_state = FlowSchedulerState::Idle; + let (pressure_class, standing_state, scheduler_state, standing) = + Self::derive_flow_classification(config, pressure_state, now, fairness); + fairness.pressure_class = pressure_class; + fairness.standing_state = standing_state; + fairness.scheduler_state = scheduler_state; + if scheduler_state == FlowSchedulerState::Idle { + fairness.deficit_bytes = 0; + } + if standing { + fairness.penalty_score = fairness.penalty_score.saturating_add(1); + } else { fairness.penalty_score = fairness.penalty_score.saturating_sub(1); - return; + } + } + + fn derive_flow_classification( + config: &WorkerFairnessConfig, + pressure_state: PressureState, + now: Instant, + fairness: &FlowFairnessState, + ) -> ( + FlowPressureClass, + StandingQueueState, + FlowSchedulerState, + bool, + ) { + if fairness.pending_bytes == 0 { + return ( + FlowPressureClass::Healthy, + StandingQueueState::Transient, + FlowSchedulerState::Idle, + false, + ); } let queue_age = fairness @@ -503,29 +598,165 @@ impl WorkerFairnessState { && (fairness.consecutive_stalls >= config.standing_stall_threshold || drain_stalled); if standing { - fairness.standing_state = StandingQueueState::Standing; - fairness.pressure_class = FlowPressureClass::Standing; - fairness.penalty_score = fairness.penalty_score.saturating_add(1); - fairness.scheduler_state = if pressure_state >= PressureState::Shedding { + let scheduler_state = if pressure_state >= PressureState::Shedding { FlowSchedulerState::SheddingCandidate } else { FlowSchedulerState::Penalized }; - return; + return ( + FlowPressureClass::Standing, + StandingQueueState::Standing, + scheduler_state, + true, + ); } - fairness.standing_state = StandingQueueState::Transient; if fairness.consecutive_stalls > 0 { - fairness.pressure_class = FlowPressureClass::Backpressured; - fairness.scheduler_state = FlowSchedulerState::Backpressured; - } else if fairness.pending_bytes >= config.standing_queue_min_backlog_bytes { - fairness.pressure_class = FlowPressureClass::Bursty; - fairness.scheduler_state = FlowSchedulerState::Active; - } else { - fairness.pressure_class = FlowPressureClass::Healthy; - fairness.scheduler_state = FlowSchedulerState::Active; + return ( + FlowPressureClass::Backpressured, + StandingQueueState::Transient, + FlowSchedulerState::Backpressured, + false, + ); } - fairness.penalty_score = fairness.penalty_score.saturating_sub(1); + + if fairness.pending_bytes >= config.standing_queue_min_backlog_bytes { + return ( + FlowPressureClass::Bursty, + StandingQueueState::Transient, + FlowSchedulerState::Active, + false, + ); + } + + ( + FlowPressureClass::Healthy, + StandingQueueState::Transient, + FlowSchedulerState::Active, + false, + ) + } + + #[inline] + fn flow_membership(fairness: &FlowFairnessState) -> (bool, bool) { + ( + fairness.standing_state == StandingQueueState::Standing, + Self::scheduler_state_is_backpressured(fairness.scheduler_state), + ) + } + + #[inline] + fn scheduler_state_is_backpressured(state: FlowSchedulerState) -> bool { + matches!( + state, + FlowSchedulerState::Backpressured + | FlowSchedulerState::Penalized + | FlowSchedulerState::SheddingCandidate + ) + } + + fn apply_flow_membership_delta( + &mut self, + before_membership: (bool, bool), + after_membership: (bool, bool), + ) { + if before_membership.0 != after_membership.0 { + if after_membership.0 { + self.standing_flow_count = self.standing_flow_count.saturating_add(1); + } else { + self.standing_flow_count = self.standing_flow_count.saturating_sub(1); + } + } + if before_membership.1 != after_membership.1 { + if after_membership.1 { + self.backpressured_flow_count = self.backpressured_flow_count.saturating_add(1); + } else { + self.backpressured_flow_count = self.backpressured_flow_count.saturating_sub(1); + } + } + } + + #[inline] + fn clamp_deficit_bytes(config: &WorkerFairnessConfig, fairness: &mut FlowFairnessState) { + let max_deficit = config.max_flow_queued_bytes.min(i64::MAX as u64) as i64; + fairness.deficit_bytes = fairness.deficit_bytes.clamp(0, max_deficit); + } + + #[inline] + fn enqueue_active_conn(&mut self, conn_id: u64) { + if self.active_ring_members.insert(conn_id) { + self.active_ring.push_back(conn_id); + } + } + + #[inline] + fn weight_for_flags(config: &WorkerFairnessConfig, flags: u32) -> u8 { + if (flags & RPC_FLAG_QUICKACK) != 0 { + return config.quickack_flow_weight.max(1); + } + config.default_flow_weight.max(1) + } + + #[cfg(test)] + pub(crate) fn debug_recompute_flow_counters(&self, now: Instant) -> (usize, usize) { + let pressure_state = self.pressure.state(); + let mut standing = 0usize; + let mut backpressured = 0usize; + for flow in self.flows.values() { + let (_, standing_state, scheduler_state, _) = + Self::derive_flow_classification(&self.config, pressure_state, now, &flow.fairness); + if standing_state == StandingQueueState::Standing { + standing = standing.saturating_add(1); + } + if Self::scheduler_state_is_backpressured(scheduler_state) { + backpressured = backpressured.saturating_add(1); + } + } + (standing, backpressured) + } + + #[cfg(test)] + pub(crate) fn debug_check_active_ring_consistency(&self) -> bool { + if self.active_ring.len() != self.active_ring_members.len() { + return false; + } + + let mut seen = HashSet::with_capacity(self.active_ring.len()); + for conn_id in self.active_ring.iter().copied() { + if !seen.insert(conn_id) { + return false; + } + if !self.active_ring_members.contains(&conn_id) { + return false; + } + let Some(flow) = self.flows.get(&conn_id) else { + return false; + }; + if !flow.fairness.in_active_ring || flow.queue.is_empty() { + return false; + } + } + + for (conn_id, flow) in self.flows.iter() { + let in_ring = self.active_ring_members.contains(conn_id); + if flow.fairness.in_active_ring != in_ring { + return false; + } + if in_ring && flow.queue.is_empty() { + return false; + } + } + + true + } + + #[cfg(test)] + pub(crate) fn debug_max_deficit_bytes(&self) -> i64 { + self.flows + .values() + .map(|entry| entry.fairness.deficit_bytes) + .max() + .unwrap_or(0) } fn effective_quantum_bytes( @@ -542,12 +773,14 @@ impl WorkerFairnessState { return config.penalized_quantum_bytes.max(1); } - match pressure_state { + let base_quantum = match pressure_state { PressureState::Normal => config.base_quantum_bytes.max(1), PressureState::Pressured => config.pressured_quantum_bytes.max(1), PressureState::Shedding => config.pressured_quantum_bytes.max(1), PressureState::Saturated => config.penalized_quantum_bytes.max(1), - } + }; + let weighted_quantum = base_quantum.saturating_mul(fairness.weight_quanta.max(1) as u32); + weighted_quantum.max(1) } fn bucket_for(&self, conn_id: u64) -> usize { diff --git a/src/transport/middle_proxy/tests/fairness_security_tests.rs b/src/transport/middle_proxy/tests/fairness_security_tests.rs index 41a8d86..eae3237 100644 --- a/src/transport/middle_proxy/tests/fairness_security_tests.rs +++ b/src/transport/middle_proxy/tests/fairness_security_tests.rs @@ -2,6 +2,7 @@ use std::time::{Duration, Instant}; use bytes::Bytes; +use crate::protocol::constants::RPC_FLAG_QUICKACK; use crate::transport::middle_proxy::fairness::{ AdmissionDecision, DispatchAction, DispatchFeedback, PressureState, SchedulerDecision, WorkerFairnessConfig, WorkerFairnessState, @@ -114,6 +115,62 @@ fn fairness_keeps_fast_flow_progress_under_slow_neighbor() { assert!(snapshot.total_queued_bytes <= 64 * 1024); } +#[test] +fn fairness_prioritizes_quickack_flow_when_weights_enabled() { + let mut now = Instant::now(); + let mut fairness = WorkerFairnessState::new( + WorkerFairnessConfig { + max_total_queued_bytes: 256 * 1024, + max_flow_queued_bytes: 64 * 1024, + base_quantum_bytes: 8 * 1024, + pressured_quantum_bytes: 8 * 1024, + penalized_quantum_bytes: 8 * 1024, + default_flow_weight: 1, + quickack_flow_weight: 4, + ..WorkerFairnessConfig::default() + }, + now, + ); + + for _ in 0..8 { + assert_eq!( + fairness.enqueue_data(10, RPC_FLAG_QUICKACK, enqueue_payload(16 * 1024), now), + AdmissionDecision::Admit + ); + assert_eq!( + fairness.enqueue_data(20, 0, enqueue_payload(16 * 1024), now), + AdmissionDecision::Admit + ); + } + + let mut quickack_dispatched = 0u64; + let mut bulk_dispatched = 0u64; + for _ in 0..64 { + now += Duration::from_millis(1); + let SchedulerDecision::Dispatch(candidate) = fairness.next_decision(now) else { + break; + }; + + if candidate.frame.conn_id == 10 { + quickack_dispatched = quickack_dispatched.saturating_add(1); + } else if candidate.frame.conn_id == 20 { + bulk_dispatched = bulk_dispatched.saturating_add(1); + } + + let _ = fairness.apply_dispatch_feedback( + candidate.frame.conn_id, + candidate, + DispatchFeedback::Routed, + now, + ); + } + + assert!( + quickack_dispatched > bulk_dispatched, + "quickack flow must receive higher dispatch rate with larger weight" + ); +} + #[test] fn fairness_pressure_hysteresis_prevents_instant_flapping() { let mut now = Instant::now(); @@ -180,6 +237,12 @@ fn fairness_randomized_sequence_preserves_memory_bounds() { } let snapshot = fairness.snapshot(); + let (standing_recomputed, backpressured_recomputed) = + fairness.debug_recompute_flow_counters(now); assert!(snapshot.total_queued_bytes <= 32 * 1024); + assert_eq!(snapshot.standing_flows, standing_recomputed); + assert_eq!(snapshot.backpressured_flows, backpressured_recomputed); + assert!(fairness.debug_check_active_ring_consistency()); + assert!(fairness.debug_max_deficit_bytes() <= 4 * 1024); } }