diff --git a/Cargo.lock b/Cargo.lock index 49ea79f..9b5ec98 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2780,7 +2780,7 @@ checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" [[package]] name = "telemt" -version = "3.4.0" +version = "3.4.1" dependencies = [ "aes", "anyhow", diff --git a/Cargo.toml b/Cargo.toml index 7b95bf5..009f679 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "telemt" -version = "3.4.0" +version = "3.4.1" edition = "2024" [features] diff --git a/src/metrics.rs b/src/metrics.rs index 34c0cac..ba44a5f 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1310,6 +1310,143 @@ async fn render_metrics( 0 } ); + let _ = writeln!( + out, + "# HELP telemt_me_fair_pressure_state Worker-local fairness pressure state" + ); + let _ = writeln!(out, "# TYPE telemt_me_fair_pressure_state gauge"); + let _ = writeln!( + out, + "telemt_me_fair_pressure_state {}", + if me_allows_normal { + stats.get_me_fair_pressure_state_gauge() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_fair_active_flows Fair-scheduler active flow count" + ); + let _ = writeln!(out, "# TYPE telemt_me_fair_active_flows gauge"); + let _ = writeln!( + out, + "telemt_me_fair_active_flows {}", + if me_allows_normal { + stats.get_me_fair_active_flows_gauge() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_fair_queued_bytes Fair-scheduler queued bytes" + ); + let _ = writeln!(out, "# TYPE telemt_me_fair_queued_bytes gauge"); + let _ = writeln!( + out, + "telemt_me_fair_queued_bytes {}", + if me_allows_normal { + stats.get_me_fair_queued_bytes_gauge() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_fair_flow_state_gauge Fair-scheduler flow health classes" + ); + let _ = writeln!(out, "# TYPE telemt_me_fair_flow_state_gauge gauge"); + let _ = writeln!( + out, + "telemt_me_fair_flow_state_gauge{{class=\"standing\"}} {}", + if me_allows_normal { + stats.get_me_fair_standing_flows_gauge() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_fair_flow_state_gauge{{class=\"backpressured\"}} {}", + if me_allows_normal { + stats.get_me_fair_backpressured_flows_gauge() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_fair_events_total Fair-scheduler event counters" + ); + let _ = writeln!(out, "# TYPE telemt_me_fair_events_total counter"); + let _ = writeln!( + out, + "telemt_me_fair_events_total{{event=\"scheduler_round\"}} {}", + if me_allows_normal { + stats.get_me_fair_scheduler_rounds_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_fair_events_total{{event=\"deficit_grant\"}} {}", + if me_allows_normal { + stats.get_me_fair_deficit_grants_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_fair_events_total{{event=\"deficit_skip\"}} {}", + if me_allows_normal { + stats.get_me_fair_deficit_skips_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_fair_events_total{{event=\"enqueue_reject\"}} {}", + if me_allows_normal { + stats.get_me_fair_enqueue_rejects_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_fair_events_total{{event=\"shed_drop\"}} {}", + if me_allows_normal { + stats.get_me_fair_shed_drops_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_fair_events_total{{event=\"penalty\"}} {}", + if me_allows_normal { + stats.get_me_fair_penalties_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_fair_events_total{{event=\"downstream_stall\"}} {}", + if me_allows_normal { + stats.get_me_fair_downstream_stalls_total() + } else { + 0 + } + ); let _ = writeln!( out, diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 38b22bb..9609e19 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -175,6 +175,18 @@ pub struct Stats { me_route_drop_queue_full: AtomicU64, me_route_drop_queue_full_base: AtomicU64, me_route_drop_queue_full_high: AtomicU64, + me_fair_pressure_state_gauge: AtomicU64, + me_fair_active_flows_gauge: AtomicU64, + me_fair_queued_bytes_gauge: AtomicU64, + me_fair_standing_flows_gauge: AtomicU64, + me_fair_backpressured_flows_gauge: AtomicU64, + me_fair_scheduler_rounds_total: AtomicU64, + me_fair_deficit_grants_total: AtomicU64, + me_fair_deficit_skips_total: AtomicU64, + me_fair_enqueue_rejects_total: AtomicU64, + me_fair_shed_drops_total: AtomicU64, + me_fair_penalties_total: AtomicU64, + me_fair_downstream_stalls_total: AtomicU64, me_d2c_batches_total: AtomicU64, me_d2c_batch_frames_total: AtomicU64, me_d2c_batch_bytes_total: AtomicU64, @@ -856,6 +868,78 @@ impl Stats { .fetch_add(1, Ordering::Relaxed); } } + pub fn set_me_fair_pressure_state_gauge(&self, value: u64) { + if self.telemetry_me_allows_normal() { + self.me_fair_pressure_state_gauge + .store(value, Ordering::Relaxed); + } + } + pub fn set_me_fair_active_flows_gauge(&self, value: u64) { + if self.telemetry_me_allows_normal() { + self.me_fair_active_flows_gauge + .store(value, Ordering::Relaxed); + } + } + pub fn set_me_fair_queued_bytes_gauge(&self, value: u64) { + if self.telemetry_me_allows_normal() { + self.me_fair_queued_bytes_gauge + .store(value, Ordering::Relaxed); + } + } + pub fn set_me_fair_standing_flows_gauge(&self, value: u64) { + if self.telemetry_me_allows_normal() { + self.me_fair_standing_flows_gauge + .store(value, Ordering::Relaxed); + } + } + pub fn set_me_fair_backpressured_flows_gauge(&self, value: u64) { + if self.telemetry_me_allows_normal() { + self.me_fair_backpressured_flows_gauge + .store(value, Ordering::Relaxed); + } + } + pub fn add_me_fair_scheduler_rounds_total(&self, value: u64) { + if self.telemetry_me_allows_normal() && value > 0 { + self.me_fair_scheduler_rounds_total + .fetch_add(value, Ordering::Relaxed); + } + } + pub fn add_me_fair_deficit_grants_total(&self, value: u64) { + if self.telemetry_me_allows_normal() && value > 0 { + self.me_fair_deficit_grants_total + .fetch_add(value, Ordering::Relaxed); + } + } + pub fn add_me_fair_deficit_skips_total(&self, value: u64) { + if self.telemetry_me_allows_normal() && value > 0 { + self.me_fair_deficit_skips_total + .fetch_add(value, Ordering::Relaxed); + } + } + pub fn add_me_fair_enqueue_rejects_total(&self, value: u64) { + if self.telemetry_me_allows_normal() && value > 0 { + self.me_fair_enqueue_rejects_total + .fetch_add(value, Ordering::Relaxed); + } + } + pub fn add_me_fair_shed_drops_total(&self, value: u64) { + if self.telemetry_me_allows_normal() && value > 0 { + self.me_fair_shed_drops_total + .fetch_add(value, Ordering::Relaxed); + } + } + pub fn add_me_fair_penalties_total(&self, value: u64) { + if self.telemetry_me_allows_normal() && value > 0 { + self.me_fair_penalties_total + .fetch_add(value, Ordering::Relaxed); + } + } + pub fn add_me_fair_downstream_stalls_total(&self, value: u64) { + if self.telemetry_me_allows_normal() && value > 0 { + self.me_fair_downstream_stalls_total + .fetch_add(value, Ordering::Relaxed); + } + } pub fn increment_me_d2c_batches_total(&self) { if self.telemetry_me_allows_normal() { self.me_d2c_batches_total.fetch_add(1, Ordering::Relaxed); @@ -1806,6 +1890,43 @@ impl Stats { pub fn get_me_route_drop_queue_full_high(&self) -> u64 { self.me_route_drop_queue_full_high.load(Ordering::Relaxed) } + pub fn get_me_fair_pressure_state_gauge(&self) -> u64 { + self.me_fair_pressure_state_gauge.load(Ordering::Relaxed) + } + pub fn get_me_fair_active_flows_gauge(&self) -> u64 { + self.me_fair_active_flows_gauge.load(Ordering::Relaxed) + } + pub fn get_me_fair_queued_bytes_gauge(&self) -> u64 { + self.me_fair_queued_bytes_gauge.load(Ordering::Relaxed) + } + pub fn get_me_fair_standing_flows_gauge(&self) -> u64 { + self.me_fair_standing_flows_gauge.load(Ordering::Relaxed) + } + pub fn get_me_fair_backpressured_flows_gauge(&self) -> u64 { + self.me_fair_backpressured_flows_gauge + .load(Ordering::Relaxed) + } + pub fn get_me_fair_scheduler_rounds_total(&self) -> u64 { + self.me_fair_scheduler_rounds_total.load(Ordering::Relaxed) + } + pub fn get_me_fair_deficit_grants_total(&self) -> u64 { + self.me_fair_deficit_grants_total.load(Ordering::Relaxed) + } + pub fn get_me_fair_deficit_skips_total(&self) -> u64 { + self.me_fair_deficit_skips_total.load(Ordering::Relaxed) + } + pub fn get_me_fair_enqueue_rejects_total(&self) -> u64 { + self.me_fair_enqueue_rejects_total.load(Ordering::Relaxed) + } + pub fn get_me_fair_shed_drops_total(&self) -> u64 { + self.me_fair_shed_drops_total.load(Ordering::Relaxed) + } + pub fn get_me_fair_penalties_total(&self) -> u64 { + self.me_fair_penalties_total.load(Ordering::Relaxed) + } + pub fn get_me_fair_downstream_stalls_total(&self) -> u64 { + self.me_fair_downstream_stalls_total.load(Ordering::Relaxed) + } pub fn get_me_d2c_batches_total(&self) -> u64 { self.me_d2c_batches_total.load(Ordering::Relaxed) } diff --git a/src/transport/middle_proxy/fairness/mod.rs b/src/transport/middle_proxy/fairness/mod.rs new file mode 100644 index 0000000..1b03f87 --- /dev/null +++ b/src/transport/middle_proxy/fairness/mod.rs @@ -0,0 +1,13 @@ +//! Backpressure-driven fairness control for ME reader routing. +//! +//! This module keeps fairness decisions worker-local: +//! each reader loop owns one scheduler instance and mutates it without locks. + +mod model; +mod pressure; +mod scheduler; + +pub(crate) use model::{ + AdmissionDecision, DispatchAction, DispatchFeedback, PressureState, SchedulerDecision, +}; +pub(crate) use scheduler::{WorkerFairnessConfig, WorkerFairnessSnapshot, WorkerFairnessState}; diff --git a/src/transport/middle_proxy/fairness/model.rs b/src/transport/middle_proxy/fairness/model.rs new file mode 100644 index 0000000..bdf4f9f --- /dev/null +++ b/src/transport/middle_proxy/fairness/model.rs @@ -0,0 +1,140 @@ +use std::time::Instant; + +use bytes::Bytes; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +#[repr(u8)] +pub(crate) enum PressureState { + Normal = 0, + Pressured = 1, + Shedding = 2, + Saturated = 3, +} + +impl PressureState { + pub(crate) fn as_u8(self) -> u8 { + self as u8 + } +} + +impl Default for PressureState { + fn default() -> Self { + Self::Normal + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum FlowPressureClass { + Healthy, + Bursty, + Backpressured, + Standing, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum StandingQueueState { + Transient, + Standing, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum FlowSchedulerState { + Idle, + Active, + Backpressured, + Penalized, + SheddingCandidate, +} + +#[derive(Debug, Clone)] +pub(crate) struct QueuedFrame { + pub(crate) conn_id: u64, + pub(crate) flags: u32, + pub(crate) data: Bytes, + pub(crate) enqueued_at: Instant, +} + +impl QueuedFrame { + #[inline] + pub(crate) fn queued_bytes(&self) -> u64 { + self.data.len() as u64 + } +} + +#[derive(Debug, Clone)] +pub(crate) struct FlowFairnessState { + pub(crate) _flow_id: u64, + pub(crate) _worker_id: u16, + pub(crate) pending_bytes: u64, + pub(crate) deficit_bytes: i64, + pub(crate) queue_started_at: Option, + pub(crate) last_drain_at: Option, + pub(crate) recent_drain_bytes: u64, + pub(crate) consecutive_stalls: u8, + pub(crate) consecutive_skips: u8, + pub(crate) penalty_score: u16, + pub(crate) pressure_class: FlowPressureClass, + pub(crate) standing_state: StandingQueueState, + pub(crate) scheduler_state: FlowSchedulerState, + pub(crate) bucket_id: usize, + pub(crate) in_active_ring: bool, +} + +impl FlowFairnessState { + pub(crate) fn new(flow_id: u64, worker_id: u16, bucket_id: usize) -> Self { + Self { + _flow_id: flow_id, + _worker_id: worker_id, + pending_bytes: 0, + deficit_bytes: 0, + queue_started_at: None, + last_drain_at: None, + recent_drain_bytes: 0, + consecutive_stalls: 0, + consecutive_skips: 0, + penalty_score: 0, + pressure_class: FlowPressureClass::Healthy, + standing_state: StandingQueueState::Transient, + scheduler_state: FlowSchedulerState::Idle, + bucket_id, + in_active_ring: false, + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum AdmissionDecision { + Admit, + RejectWorkerCap, + RejectFlowCap, + RejectBucketCap, + RejectSaturated, + RejectStandingFlow, +} + +#[derive(Debug, Clone)] +pub(crate) enum SchedulerDecision { + Idle, + Dispatch(DispatchCandidate), +} + +#[derive(Debug, Clone)] +pub(crate) struct DispatchCandidate { + pub(crate) frame: QueuedFrame, + pub(crate) pressure_state: PressureState, + pub(crate) flow_class: FlowPressureClass, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum DispatchFeedback { + Routed, + QueueFull, + ChannelClosed, + NoConn, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum DispatchAction { + Continue, + CloseFlow, +} diff --git a/src/transport/middle_proxy/fairness/pressure.rs b/src/transport/middle_proxy/fairness/pressure.rs new file mode 100644 index 0000000..1f068bd --- /dev/null +++ b/src/transport/middle_proxy/fairness/pressure.rs @@ -0,0 +1,214 @@ +use std::time::{Duration, Instant}; + +use super::model::PressureState; + +#[derive(Debug, Clone, Copy)] +pub(crate) struct PressureSignals { + pub(crate) active_flows: usize, + pub(crate) total_queued_bytes: u64, + pub(crate) standing_flows: usize, + pub(crate) backpressured_flows: usize, +} + +#[derive(Debug, Clone)] +pub(crate) struct PressureConfig { + pub(crate) evaluate_every_rounds: u32, + pub(crate) transition_hysteresis_rounds: u8, + pub(crate) standing_ratio_pressured_pct: u8, + pub(crate) standing_ratio_shedding_pct: u8, + pub(crate) standing_ratio_saturated_pct: u8, + pub(crate) queue_ratio_pressured_pct: u8, + pub(crate) queue_ratio_shedding_pct: u8, + pub(crate) queue_ratio_saturated_pct: u8, + pub(crate) reject_window: Duration, + pub(crate) rejects_pressured: u32, + pub(crate) rejects_shedding: u32, + pub(crate) rejects_saturated: u32, + pub(crate) stalls_pressured: u32, + pub(crate) stalls_shedding: u32, + pub(crate) stalls_saturated: u32, +} + +impl Default for PressureConfig { + fn default() -> Self { + Self { + evaluate_every_rounds: 8, + transition_hysteresis_rounds: 3, + standing_ratio_pressured_pct: 20, + standing_ratio_shedding_pct: 35, + standing_ratio_saturated_pct: 50, + queue_ratio_pressured_pct: 65, + queue_ratio_shedding_pct: 82, + queue_ratio_saturated_pct: 94, + reject_window: Duration::from_secs(2), + rejects_pressured: 32, + rejects_shedding: 96, + rejects_saturated: 256, + stalls_pressured: 32, + stalls_shedding: 96, + stalls_saturated: 256, + } + } +} + +#[derive(Debug)] +pub(crate) struct PressureEvaluator { + state: PressureState, + candidate_state: PressureState, + candidate_hits: u8, + rounds_since_eval: u32, + window_started_at: Instant, + admission_rejects_window: u32, + route_stalls_window: u32, +} + +impl PressureEvaluator { + pub(crate) fn new(now: Instant) -> Self { + Self { + state: PressureState::Normal, + candidate_state: PressureState::Normal, + candidate_hits: 0, + rounds_since_eval: 0, + window_started_at: now, + admission_rejects_window: 0, + route_stalls_window: 0, + } + } + + #[inline] + pub(crate) fn state(&self) -> PressureState { + self.state + } + + pub(crate) fn note_admission_reject(&mut self, now: Instant, cfg: &PressureConfig) { + self.rotate_window_if_needed(now, cfg); + self.admission_rejects_window = self.admission_rejects_window.saturating_add(1); + } + + pub(crate) fn note_route_stall(&mut self, now: Instant, cfg: &PressureConfig) { + self.rotate_window_if_needed(now, cfg); + self.route_stalls_window = self.route_stalls_window.saturating_add(1); + } + + pub(crate) fn maybe_evaluate( + &mut self, + now: Instant, + cfg: &PressureConfig, + max_total_queued_bytes: u64, + signals: PressureSignals, + force: bool, + ) -> PressureState { + self.rotate_window_if_needed(now, cfg); + self.rounds_since_eval = self.rounds_since_eval.saturating_add(1); + if !force && self.rounds_since_eval < cfg.evaluate_every_rounds.max(1) { + return self.state; + } + self.rounds_since_eval = 0; + + let target = self.derive_target_state(cfg, max_total_queued_bytes, signals); + if target == self.state { + self.candidate_state = target; + self.candidate_hits = 0; + return self.state; + } + + if self.candidate_state == target { + self.candidate_hits = self.candidate_hits.saturating_add(1); + } else { + self.candidate_state = target; + self.candidate_hits = 1; + } + + if self.candidate_hits >= cfg.transition_hysteresis_rounds.max(1) { + self.state = target; + self.candidate_hits = 0; + } + + self.state + } + + fn derive_target_state( + &self, + cfg: &PressureConfig, + max_total_queued_bytes: u64, + signals: PressureSignals, + ) -> PressureState { + let queue_ratio_pct = if max_total_queued_bytes == 0 { + 100 + } else { + ((signals.total_queued_bytes.saturating_mul(100)) / max_total_queued_bytes) + .min(100) as u8 + }; + + let standing_ratio_pct = if signals.active_flows == 0 { + 0 + } else { + ((signals.standing_flows.saturating_mul(100)) / signals.active_flows).min(100) as u8 + }; + + let mut pressure_score = 0u8; + + if queue_ratio_pct >= cfg.queue_ratio_pressured_pct { + pressure_score = pressure_score.max(1); + } + if queue_ratio_pct >= cfg.queue_ratio_shedding_pct { + pressure_score = pressure_score.max(2); + } + if queue_ratio_pct >= cfg.queue_ratio_saturated_pct { + pressure_score = pressure_score.max(3); + } + + if standing_ratio_pct >= cfg.standing_ratio_pressured_pct { + pressure_score = pressure_score.max(1); + } + if standing_ratio_pct >= cfg.standing_ratio_shedding_pct { + pressure_score = pressure_score.max(2); + } + if standing_ratio_pct >= cfg.standing_ratio_saturated_pct { + pressure_score = pressure_score.max(3); + } + + if self.admission_rejects_window >= cfg.rejects_pressured { + pressure_score = pressure_score.max(1); + } + if self.admission_rejects_window >= cfg.rejects_shedding { + pressure_score = pressure_score.max(2); + } + if self.admission_rejects_window >= cfg.rejects_saturated { + pressure_score = pressure_score.max(3); + } + + if self.route_stalls_window >= cfg.stalls_pressured { + pressure_score = pressure_score.max(1); + } + if self.route_stalls_window >= cfg.stalls_shedding { + pressure_score = pressure_score.max(2); + } + if self.route_stalls_window >= cfg.stalls_saturated { + pressure_score = pressure_score.max(3); + } + + if signals.backpressured_flows > signals.active_flows.saturating_div(2) + && signals.active_flows > 0 + { + pressure_score = pressure_score.max(2); + } + + match pressure_score { + 0 => PressureState::Normal, + 1 => PressureState::Pressured, + 2 => PressureState::Shedding, + _ => PressureState::Saturated, + } + } + + fn rotate_window_if_needed(&mut self, now: Instant, cfg: &PressureConfig) { + if now.saturating_duration_since(self.window_started_at) < cfg.reject_window { + return; + } + + self.window_started_at = now; + self.admission_rejects_window = 0; + self.route_stalls_window = 0; + } +} diff --git a/src/transport/middle_proxy/fairness/scheduler.rs b/src/transport/middle_proxy/fairness/scheduler.rs new file mode 100644 index 0000000..b4cc5d8 --- /dev/null +++ b/src/transport/middle_proxy/fairness/scheduler.rs @@ -0,0 +1,545 @@ +use std::collections::{HashMap, VecDeque}; +use std::time::{Duration, Instant}; + +use bytes::Bytes; + +use super::model::{ + AdmissionDecision, DispatchAction, DispatchCandidate, DispatchFeedback, FlowFairnessState, + FlowPressureClass, FlowSchedulerState, PressureState, QueuedFrame, SchedulerDecision, + StandingQueueState, +}; +use super::pressure::{PressureConfig, PressureEvaluator, PressureSignals}; + +#[derive(Debug, Clone)] +pub(crate) struct WorkerFairnessConfig { + pub(crate) worker_id: u16, + pub(crate) max_active_flows: usize, + pub(crate) max_total_queued_bytes: u64, + pub(crate) max_flow_queued_bytes: u64, + pub(crate) base_quantum_bytes: u32, + pub(crate) pressured_quantum_bytes: u32, + pub(crate) penalized_quantum_bytes: u32, + pub(crate) standing_queue_min_age: Duration, + pub(crate) standing_queue_min_backlog_bytes: u64, + pub(crate) standing_stall_threshold: u8, + pub(crate) max_consecutive_stalls_before_shed: u8, + pub(crate) max_consecutive_stalls_before_close: u8, + pub(crate) soft_bucket_count: usize, + pub(crate) soft_bucket_share_pct: u8, + pub(crate) pressure: PressureConfig, +} + +impl Default for WorkerFairnessConfig { + fn default() -> Self { + Self { + worker_id: 0, + max_active_flows: 4096, + max_total_queued_bytes: 16 * 1024 * 1024, + max_flow_queued_bytes: 512 * 1024, + base_quantum_bytes: 32 * 1024, + pressured_quantum_bytes: 16 * 1024, + penalized_quantum_bytes: 8 * 1024, + standing_queue_min_age: Duration::from_millis(250), + standing_queue_min_backlog_bytes: 64 * 1024, + standing_stall_threshold: 3, + max_consecutive_stalls_before_shed: 4, + max_consecutive_stalls_before_close: 16, + soft_bucket_count: 64, + soft_bucket_share_pct: 25, + pressure: PressureConfig::default(), + } + } +} + +struct FlowEntry { + fairness: FlowFairnessState, + queue: VecDeque, +} + +impl FlowEntry { + fn new(flow_id: u64, worker_id: u16, bucket_id: usize) -> Self { + Self { + fairness: FlowFairnessState::new(flow_id, worker_id, bucket_id), + queue: VecDeque::new(), + } + } +} + +#[derive(Debug, Clone, Copy, Default)] +pub(crate) struct WorkerFairnessSnapshot { + pub(crate) pressure_state: PressureState, + pub(crate) active_flows: usize, + pub(crate) total_queued_bytes: u64, + pub(crate) standing_flows: usize, + pub(crate) backpressured_flows: usize, + pub(crate) scheduler_rounds: u64, + pub(crate) deficit_grants: u64, + pub(crate) deficit_skips: u64, + pub(crate) enqueue_rejects: u64, + pub(crate) shed_drops: u64, + pub(crate) fairness_penalties: u64, + pub(crate) downstream_stalls: u64, +} + +pub(crate) struct WorkerFairnessState { + config: WorkerFairnessConfig, + pressure: PressureEvaluator, + flows: HashMap, + active_ring: VecDeque, + total_queued_bytes: u64, + bucket_queued_bytes: Vec, + bucket_active_flows: Vec, + standing_flow_count: usize, + backpressured_flow_count: usize, + scheduler_rounds: u64, + deficit_grants: u64, + deficit_skips: u64, + enqueue_rejects: u64, + shed_drops: u64, + fairness_penalties: u64, + downstream_stalls: u64, +} + +impl WorkerFairnessState { + pub(crate) fn new(config: WorkerFairnessConfig, now: Instant) -> Self { + let bucket_count = config.soft_bucket_count.max(1); + Self { + config, + pressure: PressureEvaluator::new(now), + flows: HashMap::new(), + active_ring: VecDeque::new(), + total_queued_bytes: 0, + bucket_queued_bytes: vec![0; bucket_count], + bucket_active_flows: vec![0; bucket_count], + standing_flow_count: 0, + backpressured_flow_count: 0, + scheduler_rounds: 0, + deficit_grants: 0, + deficit_skips: 0, + enqueue_rejects: 0, + shed_drops: 0, + fairness_penalties: 0, + downstream_stalls: 0, + } + } + + pub(crate) fn pressure_state(&self) -> PressureState { + self.pressure.state() + } + + pub(crate) fn snapshot(&self) -> WorkerFairnessSnapshot { + WorkerFairnessSnapshot { + pressure_state: self.pressure.state(), + active_flows: self.flows.len(), + total_queued_bytes: self.total_queued_bytes, + standing_flows: self.standing_flow_count, + backpressured_flows: self.backpressured_flow_count, + scheduler_rounds: self.scheduler_rounds, + deficit_grants: self.deficit_grants, + deficit_skips: self.deficit_skips, + enqueue_rejects: self.enqueue_rejects, + shed_drops: self.shed_drops, + fairness_penalties: self.fairness_penalties, + downstream_stalls: self.downstream_stalls, + } + } + + pub(crate) fn enqueue_data( + &mut self, + conn_id: u64, + flags: u32, + data: Bytes, + now: Instant, + ) -> AdmissionDecision { + let frame = QueuedFrame { + conn_id, + flags, + data, + enqueued_at: now, + }; + let frame_bytes = frame.queued_bytes(); + + if self.pressure.state() == PressureState::Saturated { + self.pressure + .note_admission_reject(now, &self.config.pressure); + self.enqueue_rejects = self.enqueue_rejects.saturating_add(1); + return AdmissionDecision::RejectSaturated; + } + + if self.total_queued_bytes.saturating_add(frame_bytes) > self.config.max_total_queued_bytes { + self.pressure + .note_admission_reject(now, &self.config.pressure); + self.enqueue_rejects = self.enqueue_rejects.saturating_add(1); + self.evaluate_pressure(now, true); + return AdmissionDecision::RejectWorkerCap; + } + + if !self.flows.contains_key(&conn_id) && self.flows.len() >= self.config.max_active_flows { + self.pressure + .note_admission_reject(now, &self.config.pressure); + self.enqueue_rejects = self.enqueue_rejects.saturating_add(1); + self.evaluate_pressure(now, true); + return AdmissionDecision::RejectWorkerCap; + } + + let bucket_id = self.bucket_for(conn_id); + let bucket_cap = self + .config + .max_total_queued_bytes + .saturating_mul(self.config.soft_bucket_share_pct.max(1) as u64) + .saturating_div(100) + .max(self.config.max_flow_queued_bytes); + if self.bucket_queued_bytes[bucket_id].saturating_add(frame_bytes) > bucket_cap { + self.pressure + .note_admission_reject(now, &self.config.pressure); + self.enqueue_rejects = self.enqueue_rejects.saturating_add(1); + self.evaluate_pressure(now, true); + return AdmissionDecision::RejectBucketCap; + } + + let entry = if let Some(flow) = self.flows.get_mut(&conn_id) { + flow + } else { + self.bucket_active_flows[bucket_id] = + self.bucket_active_flows[bucket_id].saturating_add(1); + self.flows.insert( + conn_id, + FlowEntry::new(conn_id, self.config.worker_id, bucket_id), + ); + self.flows + .get_mut(&conn_id) + .expect("flow inserted must be retrievable") + }; + + if entry.fairness.pending_bytes.saturating_add(frame_bytes) > self.config.max_flow_queued_bytes + { + self.pressure + .note_admission_reject(now, &self.config.pressure); + self.enqueue_rejects = self.enqueue_rejects.saturating_add(1); + self.evaluate_pressure(now, true); + return AdmissionDecision::RejectFlowCap; + } + + if self.pressure.state() >= PressureState::Shedding + && entry.fairness.standing_state == StandingQueueState::Standing + { + self.pressure + .note_admission_reject(now, &self.config.pressure); + self.enqueue_rejects = self.enqueue_rejects.saturating_add(1); + self.evaluate_pressure(now, true); + return AdmissionDecision::RejectStandingFlow; + } + + entry.fairness.pending_bytes = entry.fairness.pending_bytes.saturating_add(frame_bytes); + if entry.fairness.queue_started_at.is_none() { + entry.fairness.queue_started_at = Some(now); + } + entry.queue.push_back(frame); + + self.total_queued_bytes = self.total_queued_bytes.saturating_add(frame_bytes); + self.bucket_queued_bytes[bucket_id] = self.bucket_queued_bytes[bucket_id].saturating_add(frame_bytes); + + if !entry.fairness.in_active_ring { + entry.fairness.in_active_ring = true; + self.active_ring.push_back(conn_id); + } + + self.evaluate_pressure(now, true); + AdmissionDecision::Admit + } + + pub(crate) fn next_decision(&mut self, now: Instant) -> SchedulerDecision { + self.scheduler_rounds = self.scheduler_rounds.saturating_add(1); + self.evaluate_pressure(now, false); + + let active_len = self.active_ring.len(); + for _ in 0..active_len { + let Some(conn_id) = self.active_ring.pop_front() else { + break; + }; + + let mut candidate = None; + let mut requeue_active = false; + let mut drained_bytes = 0u64; + let mut bucket_id = 0usize; + let pressure_state = self.pressure.state(); + + if let Some(flow) = self.flows.get_mut(&conn_id) { + bucket_id = flow.fairness.bucket_id; + + if flow.queue.is_empty() { + flow.fairness.in_active_ring = false; + flow.fairness.scheduler_state = FlowSchedulerState::Idle; + flow.fairness.pending_bytes = 0; + flow.fairness.queue_started_at = None; + continue; + } + + Self::classify_flow(&self.config, pressure_state, now, &mut flow.fairness); + + let quantum = Self::effective_quantum_bytes(&self.config, pressure_state, &flow.fairness); + flow.fairness.deficit_bytes = + flow.fairness.deficit_bytes.saturating_add(i64::from(quantum)); + self.deficit_grants = self.deficit_grants.saturating_add(1); + + let front_len = flow.queue.front().map_or(0, |front| front.queued_bytes()); + if flow.fairness.deficit_bytes < front_len as i64 { + flow.fairness.consecutive_skips = flow.fairness.consecutive_skips.saturating_add(1); + self.deficit_skips = self.deficit_skips.saturating_add(1); + requeue_active = true; + } else if let Some(frame) = flow.queue.pop_front() { + drained_bytes = frame.queued_bytes(); + flow.fairness.pending_bytes = flow.fairness.pending_bytes.saturating_sub(drained_bytes); + flow.fairness.deficit_bytes = + flow.fairness.deficit_bytes.saturating_sub(drained_bytes as i64); + flow.fairness.consecutive_skips = 0; + flow.fairness.queue_started_at = flow.queue.front().map(|front| front.enqueued_at); + requeue_active = !flow.queue.is_empty(); + if !requeue_active { + flow.fairness.scheduler_state = FlowSchedulerState::Idle; + flow.fairness.in_active_ring = false; + } + candidate = Some(DispatchCandidate { + pressure_state, + flow_class: flow.fairness.pressure_class, + frame, + }); + } + } + + if drained_bytes > 0 { + self.total_queued_bytes = self.total_queued_bytes.saturating_sub(drained_bytes); + self.bucket_queued_bytes[bucket_id] = + self.bucket_queued_bytes[bucket_id].saturating_sub(drained_bytes); + } + + if requeue_active { + if let Some(flow) = self.flows.get_mut(&conn_id) { + flow.fairness.in_active_ring = true; + } + self.active_ring.push_back(conn_id); + } + + if let Some(candidate) = candidate { + return SchedulerDecision::Dispatch(candidate); + } + } + + SchedulerDecision::Idle + } + + pub(crate) fn apply_dispatch_feedback( + &mut self, + conn_id: u64, + candidate: DispatchCandidate, + feedback: DispatchFeedback, + now: Instant, + ) -> DispatchAction { + match feedback { + DispatchFeedback::Routed => { + if let Some(flow) = self.flows.get_mut(&conn_id) { + flow.fairness.last_drain_at = Some(now); + flow.fairness.recent_drain_bytes = flow + .fairness + .recent_drain_bytes + .saturating_add(candidate.frame.queued_bytes()); + flow.fairness.consecutive_stalls = 0; + if flow.fairness.scheduler_state != FlowSchedulerState::Idle { + flow.fairness.scheduler_state = FlowSchedulerState::Active; + } + } + self.evaluate_pressure(now, false); + DispatchAction::Continue + } + DispatchFeedback::QueueFull => { + self.pressure.note_route_stall(now, &self.config.pressure); + self.downstream_stalls = self.downstream_stalls.saturating_add(1); + let Some(flow) = self.flows.get_mut(&conn_id) else { + self.evaluate_pressure(now, true); + return DispatchAction::Continue; + }; + + flow.fairness.consecutive_stalls = flow.fairness.consecutive_stalls.saturating_add(1); + flow.fairness.scheduler_state = FlowSchedulerState::Backpressured; + flow.fairness.pressure_class = FlowPressureClass::Backpressured; + + let state = self.pressure.state(); + let should_shed_frame = matches!(state, PressureState::Saturated) + || (matches!(state, PressureState::Shedding) + && flow.fairness.standing_state == StandingQueueState::Standing + && flow.fairness.consecutive_stalls + >= self.config.max_consecutive_stalls_before_shed); + + if should_shed_frame { + self.shed_drops = self.shed_drops.saturating_add(1); + self.fairness_penalties = self.fairness_penalties.saturating_add(1); + } else { + let frame_bytes = candidate.frame.queued_bytes(); + flow.queue.push_front(candidate.frame); + flow.fairness.pending_bytes = flow.fairness.pending_bytes.saturating_add(frame_bytes); + if flow.fairness.queue_started_at.is_none() { + flow.fairness.queue_started_at = Some(now); + } + self.total_queued_bytes = self.total_queued_bytes.saturating_add(frame_bytes); + self.bucket_queued_bytes[flow.fairness.bucket_id] = self.bucket_queued_bytes + [flow.fairness.bucket_id] + .saturating_add(frame_bytes); + if !flow.fairness.in_active_ring { + flow.fairness.in_active_ring = true; + self.active_ring.push_back(conn_id); + } + } + + if flow.fairness.consecutive_stalls >= self.config.max_consecutive_stalls_before_close + && self.pressure.state() == PressureState::Saturated + { + self.remove_flow(conn_id); + self.evaluate_pressure(now, true); + return DispatchAction::CloseFlow; + } + + self.evaluate_pressure(now, true); + DispatchAction::Continue + } + DispatchFeedback::ChannelClosed | DispatchFeedback::NoConn => { + self.remove_flow(conn_id); + self.evaluate_pressure(now, true); + DispatchAction::CloseFlow + } + } + } + + pub(crate) fn remove_flow(&mut self, conn_id: u64) { + let Some(entry) = self.flows.remove(&conn_id) else { + return; + }; + + self.bucket_active_flows[entry.fairness.bucket_id] = self.bucket_active_flows + [entry.fairness.bucket_id] + .saturating_sub(1); + + let mut reclaimed = 0u64; + for frame in entry.queue { + reclaimed = reclaimed.saturating_add(frame.queued_bytes()); + } + self.total_queued_bytes = self.total_queued_bytes.saturating_sub(reclaimed); + self.bucket_queued_bytes[entry.fairness.bucket_id] = self.bucket_queued_bytes + [entry.fairness.bucket_id] + .saturating_sub(reclaimed); + } + + fn evaluate_pressure(&mut self, now: Instant, force: bool) { + let mut standing = 0usize; + let mut backpressured = 0usize; + + for flow in self.flows.values_mut() { + Self::classify_flow(&self.config, self.pressure.state(), now, &mut flow.fairness); + if flow.fairness.standing_state == StandingQueueState::Standing { + standing = standing.saturating_add(1); + } + if matches!( + flow.fairness.scheduler_state, + FlowSchedulerState::Backpressured + | FlowSchedulerState::Penalized + | FlowSchedulerState::SheddingCandidate + ) { + backpressured = backpressured.saturating_add(1); + } + } + + self.standing_flow_count = standing; + self.backpressured_flow_count = backpressured; + + let _ = self.pressure.maybe_evaluate( + now, + &self.config.pressure, + self.config.max_total_queued_bytes, + PressureSignals { + active_flows: self.flows.len(), + total_queued_bytes: self.total_queued_bytes, + standing_flows: standing, + backpressured_flows: backpressured, + }, + force, + ); + } + + fn classify_flow( + config: &WorkerFairnessConfig, + pressure_state: PressureState, + now: Instant, + fairness: &mut FlowFairnessState, + ) { + if fairness.pending_bytes == 0 { + fairness.pressure_class = FlowPressureClass::Healthy; + fairness.standing_state = StandingQueueState::Transient; + fairness.scheduler_state = FlowSchedulerState::Idle; + fairness.penalty_score = fairness.penalty_score.saturating_sub(1); + return; + } + + let queue_age = fairness + .queue_started_at + .map(|ts| now.saturating_duration_since(ts)) + .unwrap_or_default(); + let drain_stalled = fairness + .last_drain_at + .map(|ts| now.saturating_duration_since(ts) >= config.standing_queue_min_age) + .unwrap_or(true); + + let standing = fairness.pending_bytes >= config.standing_queue_min_backlog_bytes + && queue_age >= config.standing_queue_min_age + && (fairness.consecutive_stalls >= config.standing_stall_threshold || drain_stalled); + + if standing { + fairness.standing_state = StandingQueueState::Standing; + fairness.pressure_class = FlowPressureClass::Standing; + fairness.penalty_score = fairness.penalty_score.saturating_add(1); + fairness.scheduler_state = if pressure_state >= PressureState::Shedding { + FlowSchedulerState::SheddingCandidate + } else { + FlowSchedulerState::Penalized + }; + return; + } + + fairness.standing_state = StandingQueueState::Transient; + if fairness.consecutive_stalls > 0 { + fairness.pressure_class = FlowPressureClass::Backpressured; + fairness.scheduler_state = FlowSchedulerState::Backpressured; + } else if fairness.pending_bytes >= config.standing_queue_min_backlog_bytes { + fairness.pressure_class = FlowPressureClass::Bursty; + fairness.scheduler_state = FlowSchedulerState::Active; + } else { + fairness.pressure_class = FlowPressureClass::Healthy; + fairness.scheduler_state = FlowSchedulerState::Active; + } + fairness.penalty_score = fairness.penalty_score.saturating_sub(1); + } + + fn effective_quantum_bytes( + config: &WorkerFairnessConfig, + pressure_state: PressureState, + fairness: &FlowFairnessState, + ) -> u32 { + let penalized = matches!( + fairness.scheduler_state, + FlowSchedulerState::Penalized | FlowSchedulerState::SheddingCandidate + ); + + if penalized { + return config.penalized_quantum_bytes.max(1); + } + + match pressure_state { + PressureState::Normal => config.base_quantum_bytes.max(1), + PressureState::Pressured => config.pressured_quantum_bytes.max(1), + PressureState::Shedding => config.pressured_quantum_bytes.max(1), + PressureState::Saturated => config.penalized_quantum_bytes.max(1), + } + } + + fn bucket_for(&self, conn_id: u64) -> usize { + (conn_id as usize) % self.bucket_queued_bytes.len().max(1) + } +} diff --git a/src/transport/middle_proxy/mod.rs b/src/transport/middle_proxy/mod.rs index 6dfbee6..3ebe190 100644 --- a/src/transport/middle_proxy/mod.rs +++ b/src/transport/middle_proxy/mod.rs @@ -2,6 +2,7 @@ mod codec; mod config_updater; +mod fairness; mod handshake; mod health; #[cfg(test)] @@ -30,6 +31,9 @@ mod pool_writer; #[cfg(test)] #[path = "tests/pool_writer_security_tests.rs"] mod pool_writer_security_tests; +#[cfg(test)] +#[path = "tests/fairness_security_tests.rs"] +mod fairness_security_tests; mod reader; mod registry; mod rotation; diff --git a/src/transport/middle_proxy/reader.rs b/src/transport/middle_proxy/reader.rs index dbfd9d7..3000628 100644 --- a/src/transport/middle_proxy/reader.rs +++ b/src/transport/middle_proxy/reader.rs @@ -20,11 +20,15 @@ use crate::protocol::constants::*; use crate::stats::Stats; use super::codec::{RpcChecksumMode, WriterCommand, rpc_crc}; +use super::fairness::{ + AdmissionDecision, DispatchAction, DispatchFeedback, SchedulerDecision, WorkerFairnessConfig, + WorkerFairnessSnapshot, WorkerFairnessState, +}; use super::registry::RouteResult; use super::{ConnRegistry, MeResponse}; - const DATA_ROUTE_MAX_ATTEMPTS: usize = 3; const DATA_ROUTE_QUEUE_FULL_STARVATION_THRESHOLD: u8 = 3; +const FAIRNESS_DRAIN_BUDGET_PER_LOOP: usize = 128; fn should_close_on_route_result_for_data(result: RouteResult) -> bool { matches!(result, RouteResult::NoConn | RouteResult::ChannelClosed) @@ -77,6 +81,116 @@ async fn route_data_with_retry( } } +#[inline] +fn route_feedback(result: RouteResult) -> DispatchFeedback { + match result { + RouteResult::Routed => DispatchFeedback::Routed, + RouteResult::NoConn => DispatchFeedback::NoConn, + RouteResult::ChannelClosed => DispatchFeedback::ChannelClosed, + RouteResult::QueueFullBase | RouteResult::QueueFullHigh => DispatchFeedback::QueueFull, + } +} + +fn report_route_drop(result: RouteResult, stats: &Stats) { + match result { + RouteResult::NoConn => stats.increment_me_route_drop_no_conn(), + RouteResult::ChannelClosed => stats.increment_me_route_drop_channel_closed(), + RouteResult::QueueFullBase => { + stats.increment_me_route_drop_queue_full(); + stats.increment_me_route_drop_queue_full_base(); + } + RouteResult::QueueFullHigh => { + stats.increment_me_route_drop_queue_full(); + stats.increment_me_route_drop_queue_full_high(); + } + RouteResult::Routed => {} + } +} + +fn apply_fairness_metrics_delta( + stats: &Stats, + prev: &mut WorkerFairnessSnapshot, + current: WorkerFairnessSnapshot, +) { + stats.set_me_fair_active_flows_gauge(current.active_flows as u64); + stats.set_me_fair_queued_bytes_gauge(current.total_queued_bytes); + stats.set_me_fair_standing_flows_gauge(current.standing_flows as u64); + stats.set_me_fair_backpressured_flows_gauge(current.backpressured_flows as u64); + stats.set_me_fair_pressure_state_gauge(current.pressure_state.as_u8() as u64); + stats.add_me_fair_scheduler_rounds_total( + current.scheduler_rounds.saturating_sub(prev.scheduler_rounds), + ); + stats.add_me_fair_deficit_grants_total( + current.deficit_grants.saturating_sub(prev.deficit_grants), + ); + stats.add_me_fair_deficit_skips_total( + current.deficit_skips.saturating_sub(prev.deficit_skips), + ); + stats.add_me_fair_enqueue_rejects_total( + current.enqueue_rejects.saturating_sub(prev.enqueue_rejects), + ); + stats.add_me_fair_shed_drops_total(current.shed_drops.saturating_sub(prev.shed_drops)); + stats.add_me_fair_penalties_total( + current.fairness_penalties.saturating_sub(prev.fairness_penalties), + ); + stats.add_me_fair_downstream_stalls_total( + current + .downstream_stalls + .saturating_sub(prev.downstream_stalls), + ); + *prev = current; +} + +async fn drain_fairness_scheduler( + fairness: &mut WorkerFairnessState, + reg: &ConnRegistry, + tx: &mpsc::Sender, + data_route_queue_full_streak: &mut HashMap, + route_wait_ms: u64, + stats: &Stats, +) { + for _ in 0..FAIRNESS_DRAIN_BUDGET_PER_LOOP { + let now = Instant::now(); + let SchedulerDecision::Dispatch(candidate) = fairness.next_decision(now) else { + break; + }; + let cid = candidate.frame.conn_id; + let _pressure_state = candidate.pressure_state; + let _flow_class = candidate.flow_class; + let routed = route_data_with_retry( + reg, + cid, + candidate.frame.flags, + candidate.frame.data.clone(), + route_wait_ms, + ) + .await; + if matches!(routed, RouteResult::Routed) { + data_route_queue_full_streak.remove(&cid); + } else { + report_route_drop(routed, stats); + } + let action = fairness.apply_dispatch_feedback(cid, candidate, route_feedback(routed), now); + if is_data_route_queue_full(routed) { + let streak = data_route_queue_full_streak.entry(cid).or_insert(0); + *streak = streak.saturating_add(1); + if should_close_on_queue_full_streak(*streak) { + fairness.remove_flow(cid); + data_route_queue_full_streak.remove(&cid); + reg.unregister(cid).await; + send_close_conn(tx, cid).await; + continue; + } + } + if action == DispatchAction::CloseFlow || should_close_on_route_result_for_data(routed) { + fairness.remove_flow(cid); + data_route_queue_full_streak.remove(&cid); + reg.unregister(cid).await; + send_close_conn(tx, cid).await; + } + } +} + pub(crate) async fn reader_loop( mut rd: tokio::io::ReadHalf, dk: [u8; 32], @@ -98,7 +212,21 @@ pub(crate) async fn reader_loop( let mut raw = enc_leftover; let mut expected_seq: i32 = 0; let mut data_route_queue_full_streak = HashMap::::new(); - + let mut fairness = WorkerFairnessState::new( + WorkerFairnessConfig { + worker_id: (writer_id as u16).saturating_add(1), + max_active_flows: reg.route_channel_capacity().saturating_mul(4).max(256), + max_total_queued_bytes: (reg.route_channel_capacity() as u64) + .saturating_mul(16 * 1024) + .max(4 * 1024 * 1024), + max_flow_queued_bytes: (reg.route_channel_capacity() as u64) + .saturating_mul(2 * 1024) + .clamp(64 * 1024, 2 * 1024 * 1024), + ..WorkerFairnessConfig::default() + }, + Instant::now(), + ); + let mut fairness_snapshot = fairness.snapshot(); loop { let mut tmp = [0u8; 65_536]; let n = tokio::select! { @@ -181,36 +309,20 @@ pub(crate) async fn reader_loop( let data = body.slice(12..); trace!(cid, flags, len = data.len(), "RPC_PROXY_ANS"); - let route_wait_ms = reader_route_data_wait_ms.load(Ordering::Relaxed); - let routed = - route_data_with_retry(reg.as_ref(), cid, flags, data, route_wait_ms).await; - if matches!(routed, RouteResult::Routed) { - data_route_queue_full_streak.remove(&cid); - continue; - } - match routed { - RouteResult::NoConn => stats.increment_me_route_drop_no_conn(), - RouteResult::ChannelClosed => stats.increment_me_route_drop_channel_closed(), - RouteResult::QueueFullBase => { - stats.increment_me_route_drop_queue_full(); - stats.increment_me_route_drop_queue_full_base(); - } - RouteResult::QueueFullHigh => { - stats.increment_me_route_drop_queue_full(); - stats.increment_me_route_drop_queue_full_high(); - } - RouteResult::Routed => {} - } - if should_close_on_route_result_for_data(routed) { - data_route_queue_full_streak.remove(&cid); - reg.unregister(cid).await; - send_close_conn(&tx, cid).await; - continue; - } - if is_data_route_queue_full(routed) { + let admission = fairness.enqueue_data(cid, flags, data, Instant::now()); + if !matches!(admission, AdmissionDecision::Admit) { + stats.increment_me_route_drop_queue_full(); + stats.increment_me_route_drop_queue_full_high(); let streak = data_route_queue_full_streak.entry(cid).or_insert(0); *streak = streak.saturating_add(1); - if should_close_on_queue_full_streak(*streak) { + if should_close_on_queue_full_streak(*streak) + || matches!( + admission, + AdmissionDecision::RejectSaturated + | AdmissionDecision::RejectStandingFlow + ) + { + fairness.remove_flow(cid); data_route_queue_full_streak.remove(&cid); reg.unregister(cid).await; send_close_conn(&tx, cid).await; @@ -249,12 +361,14 @@ pub(crate) async fn reader_loop( let _ = reg.route_nowait(cid, MeResponse::Close).await; reg.unregister(cid).await; data_route_queue_full_streak.remove(&cid); + fairness.remove_flow(cid); } else if pt == RPC_CLOSE_CONN_U32 && body.len() >= 8 { let cid = u64::from_le_bytes(body[0..8].try_into().unwrap()); debug!(cid, "RPC_CLOSE_CONN from ME"); let _ = reg.route_nowait(cid, MeResponse::Close).await; reg.unregister(cid).await; data_route_queue_full_streak.remove(&cid); + fairness.remove_flow(cid); } else if pt == RPC_PING_U32 && body.len() >= 8 { let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap()); trace!(ping_id, "RPC_PING -> RPC_PONG"); @@ -310,6 +424,19 @@ pub(crate) async fn reader_loop( "Unknown RPC" ); } + + let route_wait_ms = reader_route_data_wait_ms.load(Ordering::Relaxed); + drain_fairness_scheduler( + &mut fairness, + reg.as_ref(), + &tx, + &mut data_route_queue_full_streak, + route_wait_ms, + stats.as_ref(), + ) + .await; + let current_snapshot = fairness.snapshot(); + apply_fairness_metrics_delta(stats.as_ref(), &mut fairness_snapshot, current_snapshot); } } } diff --git a/src/transport/middle_proxy/registry.rs b/src/transport/middle_proxy/registry.rs index d8625f2..0c7a0a9 100644 --- a/src/transport/middle_proxy/registry.rs +++ b/src/transport/middle_proxy/registry.rs @@ -140,6 +140,10 @@ impl ConnRegistry { } } + pub fn route_channel_capacity(&self) -> usize { + self.route_channel_capacity + } + #[cfg(test)] pub fn new() -> Self { Self::with_route_channel_capacity(4096) diff --git a/src/transport/middle_proxy/tests/fairness_security_tests.rs b/src/transport/middle_proxy/tests/fairness_security_tests.rs new file mode 100644 index 0000000..69932a3 --- /dev/null +++ b/src/transport/middle_proxy/tests/fairness_security_tests.rs @@ -0,0 +1,184 @@ +use std::time::{Duration, Instant}; + +use bytes::Bytes; + +use crate::transport::middle_proxy::fairness::{ + AdmissionDecision, DispatchAction, DispatchFeedback, PressureState, SchedulerDecision, + WorkerFairnessConfig, WorkerFairnessState, +}; + +fn enqueue_payload(size: usize) -> Bytes { + Bytes::from(vec![0xAB; size]) +} + +#[test] +fn fairness_rejects_when_worker_budget_is_exhausted() { + let now = Instant::now(); + let mut fairness = WorkerFairnessState::new( + WorkerFairnessConfig { + max_total_queued_bytes: 1024, + max_flow_queued_bytes: 1024, + ..WorkerFairnessConfig::default() + }, + now, + ); + + assert_eq!( + fairness.enqueue_data(1, 0, enqueue_payload(700), now), + AdmissionDecision::Admit + ); + assert_eq!( + fairness.enqueue_data(2, 0, enqueue_payload(400), now), + AdmissionDecision::RejectWorkerCap + ); + + let snapshot = fairness.snapshot(); + assert!(snapshot.total_queued_bytes <= 1024); + assert_eq!(snapshot.enqueue_rejects, 1); +} + +#[test] +fn fairness_marks_standing_queue_after_stall_and_age_threshold() { + let mut now = Instant::now(); + let mut fairness = WorkerFairnessState::new( + WorkerFairnessConfig { + standing_queue_min_age: Duration::from_millis(50), + standing_queue_min_backlog_bytes: 256, + standing_stall_threshold: 1, + max_flow_queued_bytes: 4096, + max_total_queued_bytes: 4096, + ..WorkerFairnessConfig::default() + }, + now, + ); + + assert_eq!( + fairness.enqueue_data(11, 0, enqueue_payload(512), now), + AdmissionDecision::Admit + ); + + now += Duration::from_millis(100); + let SchedulerDecision::Dispatch(candidate) = fairness.next_decision(now) else { + panic!("expected dispatch candidate"); + }; + + let action = fairness.apply_dispatch_feedback(11, candidate, DispatchFeedback::QueueFull, now); + assert!(matches!(action, DispatchAction::Continue)); + + let snapshot = fairness.snapshot(); + assert_eq!(snapshot.standing_flows, 1); + assert!(snapshot.backpressured_flows >= 1); +} + +#[test] +fn fairness_keeps_fast_flow_progress_under_slow_neighbor() { + let mut now = Instant::now(); + let mut fairness = WorkerFairnessState::new( + WorkerFairnessConfig { + max_total_queued_bytes: 64 * 1024, + max_flow_queued_bytes: 32 * 1024, + ..WorkerFairnessConfig::default() + }, + now, + ); + + for _ in 0..16 { + assert_eq!( + fairness.enqueue_data(1, 0, enqueue_payload(512), now), + AdmissionDecision::Admit + ); + assert_eq!( + fairness.enqueue_data(2, 0, enqueue_payload(512), now), + AdmissionDecision::Admit + ); + } + + let mut fast_routed = 0u64; + for _ in 0..128 { + now += Duration::from_millis(5); + let SchedulerDecision::Dispatch(candidate) = fairness.next_decision(now) else { + break; + }; + let cid = candidate.frame.conn_id; + let feedback = if cid == 2 { + DispatchFeedback::QueueFull + } else { + fast_routed = fast_routed.saturating_add(1); + DispatchFeedback::Routed + }; + let _ = fairness.apply_dispatch_feedback(cid, candidate, feedback, now); + } + + let snapshot = fairness.snapshot(); + assert!(fast_routed > 0, "fast flow must continue making progress"); + assert!(snapshot.total_queued_bytes <= 64 * 1024); +} + +#[test] +fn fairness_pressure_hysteresis_prevents_instant_flapping() { + let mut now = Instant::now(); + let mut cfg = WorkerFairnessConfig::default(); + cfg.max_total_queued_bytes = 4096; + cfg.max_flow_queued_bytes = 4096; + cfg.pressure.evaluate_every_rounds = 1; + cfg.pressure.transition_hysteresis_rounds = 3; + cfg.pressure.queue_ratio_pressured_pct = 40; + cfg.pressure.queue_ratio_shedding_pct = 60; + cfg.pressure.queue_ratio_saturated_pct = 80; + + let mut fairness = WorkerFairnessState::new(cfg, now); + + for _ in 0..4 { + assert_eq!( + fairness.enqueue_data(9, 0, enqueue_payload(900), now), + AdmissionDecision::Admit + ); + } + + for _ in 0..2 { + now += Duration::from_millis(1); + let _ = fairness.next_decision(now); + } + + assert_eq!( + fairness.pressure_state(), + PressureState::Normal, + "state must not flip before hysteresis confirmations" + ); +} + +#[test] +fn fairness_randomized_sequence_preserves_memory_bounds() { + let mut now = Instant::now(); + let mut fairness = WorkerFairnessState::new( + WorkerFairnessConfig { + max_total_queued_bytes: 32 * 1024, + max_flow_queued_bytes: 4 * 1024, + ..WorkerFairnessConfig::default() + }, + now, + ); + + let mut seed = 0xC0FFEE_u64; + for _ in 0..4096 { + seed ^= seed << 7; + seed ^= seed >> 9; + seed ^= seed << 8; + let flow = (seed % 32) + 1; + let size = ((seed >> 8) % 512 + 64) as usize; + let _ = fairness.enqueue_data(flow, 0, enqueue_payload(size), now); + + now += Duration::from_millis(1); + if let SchedulerDecision::Dispatch(candidate) = fairness.next_decision(now) { + let feedback = if seed & 0x1 == 0 { + DispatchFeedback::Routed + } else { + DispatchFeedback::QueueFull + }; + let _ = fairness.apply_dispatch_feedback(candidate.frame.conn_id, candidate, feedback, now); + } + + let snapshot = fairness.snapshot(); + assert!(snapshot.total_queued_bytes <= 32 * 1024); + } +}