diff --git a/src/transport/middle_proxy/fairness/model.rs b/src/transport/middle_proxy/fairness/model.rs index bdf4f9f..99a9330 100644 --- a/src/transport/middle_proxy/fairness/model.rs +++ b/src/transport/middle_proxy/fairness/model.rs @@ -77,11 +77,12 @@ pub(crate) struct FlowFairnessState { pub(crate) standing_state: StandingQueueState, pub(crate) scheduler_state: FlowSchedulerState, pub(crate) bucket_id: usize, + pub(crate) weight_quanta: u8, pub(crate) in_active_ring: bool, } impl FlowFairnessState { - pub(crate) fn new(flow_id: u64, worker_id: u16, bucket_id: usize) -> Self { + pub(crate) fn new(flow_id: u64, worker_id: u16, bucket_id: usize, weight_quanta: u8) -> Self { Self { _flow_id: flow_id, _worker_id: worker_id, @@ -97,6 +98,7 @@ impl FlowFairnessState { standing_state: StandingQueueState::Transient, scheduler_state: FlowSchedulerState::Idle, bucket_id, + weight_quanta: weight_quanta.max(1), in_active_ring: false, } } diff --git a/src/transport/middle_proxy/fairness/pressure.rs b/src/transport/middle_proxy/fairness/pressure.rs index 02a5942..de8d605 100644 --- a/src/transport/middle_proxy/fairness/pressure.rs +++ b/src/transport/middle_proxy/fairness/pressure.rs @@ -146,59 +146,55 @@ impl PressureEvaluator { ((signals.standing_flows.saturating_mul(100)) / signals.active_flows).min(100) as u8 }; - let mut pressure_score = 0u8; + let mut pressured = false; + let mut saturated = false; + let queue_saturated_pct = cfg.queue_ratio_shedding_pct.min(cfg.queue_ratio_saturated_pct); if queue_ratio_pct >= cfg.queue_ratio_pressured_pct { - pressure_score = pressure_score.max(1); + pressured = true; } - if queue_ratio_pct >= cfg.queue_ratio_shedding_pct { - pressure_score = pressure_score.max(2); - } - if queue_ratio_pct >= cfg.queue_ratio_saturated_pct { - pressure_score = pressure_score.max(3); + if queue_ratio_pct >= queue_saturated_pct { + saturated = true; } + let standing_saturated_pct = cfg + .standing_ratio_shedding_pct + .min(cfg.standing_ratio_saturated_pct); if standing_ratio_pct >= cfg.standing_ratio_pressured_pct { - pressure_score = pressure_score.max(1); + pressured = true; } - if standing_ratio_pct >= cfg.standing_ratio_shedding_pct { - pressure_score = pressure_score.max(2); - } - if standing_ratio_pct >= cfg.standing_ratio_saturated_pct { - pressure_score = pressure_score.max(3); + if standing_ratio_pct >= standing_saturated_pct { + saturated = true; } + let rejects_saturated = cfg.rejects_shedding.min(cfg.rejects_saturated); if self.admission_rejects_window >= cfg.rejects_pressured { - pressure_score = pressure_score.max(1); + pressured = true; } - if self.admission_rejects_window >= cfg.rejects_shedding { - pressure_score = pressure_score.max(2); - } - if self.admission_rejects_window >= cfg.rejects_saturated { - pressure_score = pressure_score.max(3); + if self.admission_rejects_window >= rejects_saturated { + saturated = true; } + let stalls_saturated = cfg.stalls_shedding.min(cfg.stalls_saturated); if self.route_stalls_window >= cfg.stalls_pressured { - pressure_score = pressure_score.max(1); + pressured = true; } - if self.route_stalls_window >= cfg.stalls_shedding { - pressure_score = pressure_score.max(2); - } - if self.route_stalls_window >= cfg.stalls_saturated { - pressure_score = pressure_score.max(3); + if self.route_stalls_window >= stalls_saturated { + saturated = true; } if signals.backpressured_flows > signals.active_flows.saturating_div(2) && signals.active_flows > 0 { - pressure_score = pressure_score.max(2); + pressured = true; } - match pressure_score { - 0 => PressureState::Normal, - 1 => PressureState::Pressured, - 2 => PressureState::Shedding, - _ => PressureState::Saturated, + if saturated { + PressureState::Saturated + } else if pressured { + PressureState::Pressured + } else { + PressureState::Normal } } diff --git a/src/transport/middle_proxy/fairness/scheduler.rs b/src/transport/middle_proxy/fairness/scheduler.rs index 4adb8e4..48b6790 100644 --- a/src/transport/middle_proxy/fairness/scheduler.rs +++ b/src/transport/middle_proxy/fairness/scheduler.rs @@ -2,6 +2,7 @@ use std::collections::{HashMap, HashSet, VecDeque}; use std::time::{Duration, Instant}; use bytes::Bytes; +use crate::protocol::constants::RPC_FLAG_QUICKACK; use super::model::{ AdmissionDecision, DispatchAction, DispatchCandidate, DispatchFeedback, FlowFairnessState, @@ -26,6 +27,8 @@ pub(crate) struct WorkerFairnessConfig { pub(crate) max_consecutive_stalls_before_close: u8, pub(crate) soft_bucket_count: usize, pub(crate) soft_bucket_share_pct: u8, + pub(crate) default_flow_weight: u8, + pub(crate) quickack_flow_weight: u8, pub(crate) pressure: PressureConfig, } @@ -46,6 +49,8 @@ impl Default for WorkerFairnessConfig { max_consecutive_stalls_before_close: 16, soft_bucket_count: 64, soft_bucket_share_pct: 25, + default_flow_weight: 1, + quickack_flow_weight: 4, pressure: PressureConfig::default(), } } @@ -57,9 +62,9 @@ struct FlowEntry { } impl FlowEntry { - fn new(flow_id: u64, worker_id: u16, bucket_id: usize) -> Self { + fn new(flow_id: u64, worker_id: u16, bucket_id: usize, weight_quanta: u8) -> Self { Self { - fairness: FlowFairnessState::new(flow_id, worker_id, bucket_id), + fairness: FlowFairnessState::new(flow_id, worker_id, bucket_id, weight_quanta), queue: VecDeque::new(), } } @@ -186,6 +191,7 @@ impl WorkerFairnessState { } let bucket_id = self.bucket_for(conn_id); + let frame_weight = Self::weight_for_flags(&self.config, flags); let bucket_cap = self .config .max_total_queued_bytes @@ -207,12 +213,13 @@ impl WorkerFairnessState { self.bucket_active_flows[bucket_id].saturating_add(1); self.flows.insert( conn_id, - FlowEntry::new(conn_id, self.config.worker_id, bucket_id), + FlowEntry::new(conn_id, self.config.worker_id, bucket_id, frame_weight), ); self.flows .get_mut(&conn_id) .expect("flow inserted must be retrievable") }; + entry.fairness.weight_quanta = entry.fairness.weight_quanta.max(frame_weight); if entry.fairness.pending_bytes.saturating_add(frame_bytes) > self.config.max_flow_queued_bytes @@ -682,6 +689,14 @@ impl WorkerFairnessState { } } + #[inline] + fn weight_for_flags(config: &WorkerFairnessConfig, flags: u32) -> u8 { + if (flags & RPC_FLAG_QUICKACK) != 0 { + return config.quickack_flow_weight.max(1); + } + config.default_flow_weight.max(1) + } + #[cfg(test)] pub(crate) fn debug_recompute_flow_counters(&self, now: Instant) -> (usize, usize) { let pressure_state = self.pressure.state(); @@ -758,12 +773,14 @@ impl WorkerFairnessState { return config.penalized_quantum_bytes.max(1); } - match pressure_state { + let base_quantum = match pressure_state { PressureState::Normal => config.base_quantum_bytes.max(1), PressureState::Pressured => config.pressured_quantum_bytes.max(1), PressureState::Shedding => config.pressured_quantum_bytes.max(1), PressureState::Saturated => config.penalized_quantum_bytes.max(1), - } + }; + let weighted_quantum = base_quantum.saturating_mul(fairness.weight_quanta.max(1) as u32); + weighted_quantum.max(1) } fn bucket_for(&self, conn_id: u64) -> usize { diff --git a/src/transport/middle_proxy/tests/fairness_security_tests.rs b/src/transport/middle_proxy/tests/fairness_security_tests.rs index 058a574..eae3237 100644 --- a/src/transport/middle_proxy/tests/fairness_security_tests.rs +++ b/src/transport/middle_proxy/tests/fairness_security_tests.rs @@ -2,6 +2,7 @@ use std::time::{Duration, Instant}; use bytes::Bytes; +use crate::protocol::constants::RPC_FLAG_QUICKACK; use crate::transport::middle_proxy::fairness::{ AdmissionDecision, DispatchAction, DispatchFeedback, PressureState, SchedulerDecision, WorkerFairnessConfig, WorkerFairnessState, @@ -114,6 +115,62 @@ fn fairness_keeps_fast_flow_progress_under_slow_neighbor() { assert!(snapshot.total_queued_bytes <= 64 * 1024); } +#[test] +fn fairness_prioritizes_quickack_flow_when_weights_enabled() { + let mut now = Instant::now(); + let mut fairness = WorkerFairnessState::new( + WorkerFairnessConfig { + max_total_queued_bytes: 256 * 1024, + max_flow_queued_bytes: 64 * 1024, + base_quantum_bytes: 8 * 1024, + pressured_quantum_bytes: 8 * 1024, + penalized_quantum_bytes: 8 * 1024, + default_flow_weight: 1, + quickack_flow_weight: 4, + ..WorkerFairnessConfig::default() + }, + now, + ); + + for _ in 0..8 { + assert_eq!( + fairness.enqueue_data(10, RPC_FLAG_QUICKACK, enqueue_payload(16 * 1024), now), + AdmissionDecision::Admit + ); + assert_eq!( + fairness.enqueue_data(20, 0, enqueue_payload(16 * 1024), now), + AdmissionDecision::Admit + ); + } + + let mut quickack_dispatched = 0u64; + let mut bulk_dispatched = 0u64; + for _ in 0..64 { + now += Duration::from_millis(1); + let SchedulerDecision::Dispatch(candidate) = fairness.next_decision(now) else { + break; + }; + + if candidate.frame.conn_id == 10 { + quickack_dispatched = quickack_dispatched.saturating_add(1); + } else if candidate.frame.conn_id == 20 { + bulk_dispatched = bulk_dispatched.saturating_add(1); + } + + let _ = fairness.apply_dispatch_feedback( + candidate.frame.conn_id, + candidate, + DispatchFeedback::Routed, + now, + ); + } + + assert!( + quickack_dispatched > bulk_dispatched, + "quickack flow must receive higher dispatch rate with larger weight" + ); +} + #[test] fn fairness_pressure_hysteresis_prevents_instant_flapping() { let mut now = Instant::now();