This commit is contained in:
Alexey
2026-04-17 10:48:01 +03:00
parent 073eacbb37
commit 17a966b822
13 changed files with 116 additions and 84 deletions

View File

@@ -136,8 +136,8 @@ impl PressureEvaluator {
let queue_ratio_pct = if max_total_queued_bytes == 0 {
100
} else {
((signals.total_queued_bytes.saturating_mul(100)) / max_total_queued_bytes)
.min(100) as u8
((signals.total_queued_bytes.saturating_mul(100)) / max_total_queued_bytes).min(100)
as u8
};
let standing_ratio_pct = if signals.active_flows == 0 {

View File

@@ -166,7 +166,8 @@ impl WorkerFairnessState {
return AdmissionDecision::RejectSaturated;
}
if self.total_queued_bytes.saturating_add(frame_bytes) > self.config.max_total_queued_bytes {
if self.total_queued_bytes.saturating_add(frame_bytes) > self.config.max_total_queued_bytes
{
self.pressure
.note_admission_reject(now, &self.config.pressure);
self.enqueue_rejects = self.enqueue_rejects.saturating_add(1);
@@ -211,7 +212,8 @@ impl WorkerFairnessState {
.expect("flow inserted must be retrievable")
};
if entry.fairness.pending_bytes.saturating_add(frame_bytes) > self.config.max_flow_queued_bytes
if entry.fairness.pending_bytes.saturating_add(frame_bytes)
> self.config.max_flow_queued_bytes
{
self.pressure
.note_admission_reject(now, &self.config.pressure);
@@ -237,7 +239,8 @@ impl WorkerFairnessState {
entry.queue.push_back(frame);
self.total_queued_bytes = self.total_queued_bytes.saturating_add(frame_bytes);
self.bucket_queued_bytes[bucket_id] = self.bucket_queued_bytes[bucket_id].saturating_add(frame_bytes);
self.bucket_queued_bytes[bucket_id] =
self.bucket_queued_bytes[bucket_id].saturating_add(frame_bytes);
if !entry.fairness.in_active_ring {
entry.fairness.in_active_ring = true;
@@ -277,23 +280,31 @@ impl WorkerFairnessState {
Self::classify_flow(&self.config, pressure_state, now, &mut flow.fairness);
let quantum = Self::effective_quantum_bytes(&self.config, pressure_state, &flow.fairness);
flow.fairness.deficit_bytes =
flow.fairness.deficit_bytes.saturating_add(i64::from(quantum));
let quantum =
Self::effective_quantum_bytes(&self.config, pressure_state, &flow.fairness);
flow.fairness.deficit_bytes = flow
.fairness
.deficit_bytes
.saturating_add(i64::from(quantum));
self.deficit_grants = self.deficit_grants.saturating_add(1);
let front_len = flow.queue.front().map_or(0, |front| front.queued_bytes());
if flow.fairness.deficit_bytes < front_len as i64 {
flow.fairness.consecutive_skips = flow.fairness.consecutive_skips.saturating_add(1);
flow.fairness.consecutive_skips =
flow.fairness.consecutive_skips.saturating_add(1);
self.deficit_skips = self.deficit_skips.saturating_add(1);
requeue_active = true;
} else if let Some(frame) = flow.queue.pop_front() {
drained_bytes = frame.queued_bytes();
flow.fairness.pending_bytes = flow.fairness.pending_bytes.saturating_sub(drained_bytes);
flow.fairness.deficit_bytes =
flow.fairness.deficit_bytes.saturating_sub(drained_bytes as i64);
flow.fairness.pending_bytes =
flow.fairness.pending_bytes.saturating_sub(drained_bytes);
flow.fairness.deficit_bytes = flow
.fairness
.deficit_bytes
.saturating_sub(drained_bytes as i64);
flow.fairness.consecutive_skips = 0;
flow.fairness.queue_started_at = flow.queue.front().map(|front| front.enqueued_at);
flow.fairness.queue_started_at =
flow.queue.front().map(|front| front.enqueued_at);
requeue_active = !flow.queue.is_empty();
if !requeue_active {
flow.fairness.scheduler_state = FlowSchedulerState::Idle;
@@ -359,7 +370,8 @@ impl WorkerFairnessState {
return DispatchAction::Continue;
};
flow.fairness.consecutive_stalls = flow.fairness.consecutive_stalls.saturating_add(1);
flow.fairness.consecutive_stalls =
flow.fairness.consecutive_stalls.saturating_add(1);
flow.fairness.scheduler_state = FlowSchedulerState::Backpressured;
flow.fairness.pressure_class = FlowPressureClass::Backpressured;
@@ -376,7 +388,8 @@ impl WorkerFairnessState {
} else {
let frame_bytes = candidate.frame.queued_bytes();
flow.queue.push_front(candidate.frame);
flow.fairness.pending_bytes = flow.fairness.pending_bytes.saturating_add(frame_bytes);
flow.fairness.pending_bytes =
flow.fairness.pending_bytes.saturating_add(frame_bytes);
if flow.fairness.queue_started_at.is_none() {
flow.fairness.queue_started_at = Some(now);
}
@@ -390,7 +403,8 @@ impl WorkerFairnessState {
}
}
if flow.fairness.consecutive_stalls >= self.config.max_consecutive_stalls_before_close
if flow.fairness.consecutive_stalls
>= self.config.max_consecutive_stalls_before_close
&& self.pressure.state() == PressureState::Saturated
{
self.remove_flow(conn_id);
@@ -414,18 +428,16 @@ impl WorkerFairnessState {
return;
};
self.bucket_active_flows[entry.fairness.bucket_id] = self.bucket_active_flows
[entry.fairness.bucket_id]
.saturating_sub(1);
self.bucket_active_flows[entry.fairness.bucket_id] =
self.bucket_active_flows[entry.fairness.bucket_id].saturating_sub(1);
let mut reclaimed = 0u64;
for frame in entry.queue {
reclaimed = reclaimed.saturating_add(frame.queued_bytes());
}
self.total_queued_bytes = self.total_queued_bytes.saturating_sub(reclaimed);
self.bucket_queued_bytes[entry.fairness.bucket_id] = self.bucket_queued_bytes
[entry.fairness.bucket_id]
.saturating_sub(reclaimed);
self.bucket_queued_bytes[entry.fairness.bucket_id] =
self.bucket_queued_bytes[entry.fairness.bucket_id].saturating_sub(reclaimed);
}
fn evaluate_pressure(&mut self, now: Instant, force: bool) {

View File

@@ -3,6 +3,9 @@
mod codec;
mod config_updater;
mod fairness;
#[cfg(test)]
#[path = "tests/fairness_security_tests.rs"]
mod fairness_security_tests;
mod handshake;
mod health;
#[cfg(test)]
@@ -31,9 +34,6 @@ mod pool_writer;
#[cfg(test)]
#[path = "tests/pool_writer_security_tests.rs"]
mod pool_writer_security_tests;
#[cfg(test)]
#[path = "tests/fairness_security_tests.rs"]
mod fairness_security_tests;
mod reader;
mod registry;
mod rotation;

View File

@@ -118,20 +118,22 @@ fn apply_fairness_metrics_delta(
stats.set_me_fair_backpressured_flows_gauge(current.backpressured_flows as u64);
stats.set_me_fair_pressure_state_gauge(current.pressure_state.as_u8() as u64);
stats.add_me_fair_scheduler_rounds_total(
current.scheduler_rounds.saturating_sub(prev.scheduler_rounds),
current
.scheduler_rounds
.saturating_sub(prev.scheduler_rounds),
);
stats.add_me_fair_deficit_grants_total(
current.deficit_grants.saturating_sub(prev.deficit_grants),
);
stats.add_me_fair_deficit_skips_total(
current.deficit_skips.saturating_sub(prev.deficit_skips),
);
stats.add_me_fair_deficit_skips_total(current.deficit_skips.saturating_sub(prev.deficit_skips));
stats.add_me_fair_enqueue_rejects_total(
current.enqueue_rejects.saturating_sub(prev.enqueue_rejects),
);
stats.add_me_fair_shed_drops_total(current.shed_drops.saturating_sub(prev.shed_drops));
stats.add_me_fair_penalties_total(
current.fairness_penalties.saturating_sub(prev.fairness_penalties),
current
.fairness_penalties
.saturating_sub(prev.fairness_penalties),
);
stats.add_me_fair_downstream_stalls_total(
current

View File

@@ -175,7 +175,8 @@ fn fairness_randomized_sequence_preserves_memory_bounds() {
} else {
DispatchFeedback::QueueFull
};
let _ = fairness.apply_dispatch_feedback(candidate.frame.conn_id, candidate, feedback, now);
let _ =
fairness.apply_dispatch_feedback(candidate.frame.conn_id, candidate, feedback, now);
}
let snapshot = fairness.snapshot();