Backpressure-driven Fairness

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey
2026-04-17 10:33:37 +03:00
parent d100941426
commit 5c99cd8eb7
12 changed files with 1521 additions and 32 deletions
@@ -0,0 +1,184 @@
use std::time::{Duration, Instant};
use bytes::Bytes;
use crate::transport::middle_proxy::fairness::{
AdmissionDecision, DispatchAction, DispatchFeedback, PressureState, SchedulerDecision,
WorkerFairnessConfig, WorkerFairnessState,
};
fn enqueue_payload(size: usize) -> Bytes {
Bytes::from(vec![0xAB; size])
}
#[test]
fn fairness_rejects_when_worker_budget_is_exhausted() {
let now = Instant::now();
let mut fairness = WorkerFairnessState::new(
WorkerFairnessConfig {
max_total_queued_bytes: 1024,
max_flow_queued_bytes: 1024,
..WorkerFairnessConfig::default()
},
now,
);
assert_eq!(
fairness.enqueue_data(1, 0, enqueue_payload(700), now),
AdmissionDecision::Admit
);
assert_eq!(
fairness.enqueue_data(2, 0, enqueue_payload(400), now),
AdmissionDecision::RejectWorkerCap
);
let snapshot = fairness.snapshot();
assert!(snapshot.total_queued_bytes <= 1024);
assert_eq!(snapshot.enqueue_rejects, 1);
}
#[test]
fn fairness_marks_standing_queue_after_stall_and_age_threshold() {
let mut now = Instant::now();
let mut fairness = WorkerFairnessState::new(
WorkerFairnessConfig {
standing_queue_min_age: Duration::from_millis(50),
standing_queue_min_backlog_bytes: 256,
standing_stall_threshold: 1,
max_flow_queued_bytes: 4096,
max_total_queued_bytes: 4096,
..WorkerFairnessConfig::default()
},
now,
);
assert_eq!(
fairness.enqueue_data(11, 0, enqueue_payload(512), now),
AdmissionDecision::Admit
);
now += Duration::from_millis(100);
let SchedulerDecision::Dispatch(candidate) = fairness.next_decision(now) else {
panic!("expected dispatch candidate");
};
let action = fairness.apply_dispatch_feedback(11, candidate, DispatchFeedback::QueueFull, now);
assert!(matches!(action, DispatchAction::Continue));
let snapshot = fairness.snapshot();
assert_eq!(snapshot.standing_flows, 1);
assert!(snapshot.backpressured_flows >= 1);
}
#[test]
fn fairness_keeps_fast_flow_progress_under_slow_neighbor() {
let mut now = Instant::now();
let mut fairness = WorkerFairnessState::new(
WorkerFairnessConfig {
max_total_queued_bytes: 64 * 1024,
max_flow_queued_bytes: 32 * 1024,
..WorkerFairnessConfig::default()
},
now,
);
for _ in 0..16 {
assert_eq!(
fairness.enqueue_data(1, 0, enqueue_payload(512), now),
AdmissionDecision::Admit
);
assert_eq!(
fairness.enqueue_data(2, 0, enqueue_payload(512), now),
AdmissionDecision::Admit
);
}
let mut fast_routed = 0u64;
for _ in 0..128 {
now += Duration::from_millis(5);
let SchedulerDecision::Dispatch(candidate) = fairness.next_decision(now) else {
break;
};
let cid = candidate.frame.conn_id;
let feedback = if cid == 2 {
DispatchFeedback::QueueFull
} else {
fast_routed = fast_routed.saturating_add(1);
DispatchFeedback::Routed
};
let _ = fairness.apply_dispatch_feedback(cid, candidate, feedback, now);
}
let snapshot = fairness.snapshot();
assert!(fast_routed > 0, "fast flow must continue making progress");
assert!(snapshot.total_queued_bytes <= 64 * 1024);
}
#[test]
fn fairness_pressure_hysteresis_prevents_instant_flapping() {
let mut now = Instant::now();
let mut cfg = WorkerFairnessConfig::default();
cfg.max_total_queued_bytes = 4096;
cfg.max_flow_queued_bytes = 4096;
cfg.pressure.evaluate_every_rounds = 1;
cfg.pressure.transition_hysteresis_rounds = 3;
cfg.pressure.queue_ratio_pressured_pct = 40;
cfg.pressure.queue_ratio_shedding_pct = 60;
cfg.pressure.queue_ratio_saturated_pct = 80;
let mut fairness = WorkerFairnessState::new(cfg, now);
for _ in 0..4 {
assert_eq!(
fairness.enqueue_data(9, 0, enqueue_payload(900), now),
AdmissionDecision::Admit
);
}
for _ in 0..2 {
now += Duration::from_millis(1);
let _ = fairness.next_decision(now);
}
assert_eq!(
fairness.pressure_state(),
PressureState::Normal,
"state must not flip before hysteresis confirmations"
);
}
#[test]
fn fairness_randomized_sequence_preserves_memory_bounds() {
let mut now = Instant::now();
let mut fairness = WorkerFairnessState::new(
WorkerFairnessConfig {
max_total_queued_bytes: 32 * 1024,
max_flow_queued_bytes: 4 * 1024,
..WorkerFairnessConfig::default()
},
now,
);
let mut seed = 0xC0FFEE_u64;
for _ in 0..4096 {
seed ^= seed << 7;
seed ^= seed >> 9;
seed ^= seed << 8;
let flow = (seed % 32) + 1;
let size = ((seed >> 8) % 512 + 64) as usize;
let _ = fairness.enqueue_data(flow, 0, enqueue_payload(size), now);
now += Duration::from_millis(1);
if let SchedulerDecision::Dispatch(candidate) = fairness.next_decision(now) {
let feedback = if seed & 0x1 == 0 {
DispatchFeedback::Routed
} else {
DispatchFeedback::QueueFull
};
let _ = fairness.apply_dispatch_feedback(candidate.frame.conn_id, candidate, feedback, now);
}
let snapshot = fairness.snapshot();
assert!(snapshot.total_queued_bytes <= 32 * 1024);
}
}