From 9cb49bc02489038186abc5aa3eb8528fe0727f83 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Sun, 19 Apr 2026 19:03:45 +0300 Subject: [PATCH] Fix in Fairness tests --- src/transport/middle_proxy/fairness/pressure.rs | 4 +++- src/transport/middle_proxy/fairness/scheduler.rs | 12 +++++------- src/transport/middle_proxy/reader.rs | 15 ++++++++++++--- .../middle_proxy/tests/fairness_security_tests.rs | 4 ++-- 4 files changed, 22 insertions(+), 13 deletions(-) diff --git a/src/transport/middle_proxy/fairness/pressure.rs b/src/transport/middle_proxy/fairness/pressure.rs index de8d605..6c84a4c 100644 --- a/src/transport/middle_proxy/fairness/pressure.rs +++ b/src/transport/middle_proxy/fairness/pressure.rs @@ -149,7 +149,9 @@ impl PressureEvaluator { let mut pressured = false; let mut saturated = false; - let queue_saturated_pct = cfg.queue_ratio_shedding_pct.min(cfg.queue_ratio_saturated_pct); + let queue_saturated_pct = cfg + .queue_ratio_shedding_pct + .min(cfg.queue_ratio_saturated_pct); if queue_ratio_pct >= cfg.queue_ratio_pressured_pct { pressured = true; } diff --git a/src/transport/middle_proxy/fairness/scheduler.rs b/src/transport/middle_proxy/fairness/scheduler.rs index 48b6790..b8079ce 100644 --- a/src/transport/middle_proxy/fairness/scheduler.rs +++ b/src/transport/middle_proxy/fairness/scheduler.rs @@ -1,8 +1,8 @@ use std::collections::{HashMap, HashSet, VecDeque}; use std::time::{Duration, Instant}; -use bytes::Bytes; use crate::protocol::constants::RPC_FLAG_QUICKACK; +use bytes::Bytes; use super::model::{ AdmissionDecision, DispatchAction, DispatchCandidate, DispatchFeedback, FlowFairnessState, @@ -310,11 +310,8 @@ impl WorkerFairnessState { } else { Self::classify_flow(&self.config, pressure_state, now, &mut flow.fairness); - let quantum = Self::effective_quantum_bytes( - &self.config, - pressure_state, - &flow.fairness, - ); + let quantum = + Self::effective_quantum_bytes(&self.config, pressure_state, &flow.fairness); flow.fairness.deficit_bytes = flow .fairness .deficit_bytes @@ -507,7 +504,8 @@ impl WorkerFairnessState { return; }; self.active_ring_members.remove(&conn_id); - self.active_ring.retain(|queued_conn_id| *queued_conn_id != conn_id); + self.active_ring + .retain(|queued_conn_id| *queued_conn_id != conn_id); let (was_standing, was_backpressured) = Self::flow_membership(&entry.fairness); if was_standing { self.standing_flow_count = self.standing_flow_count.saturating_sub(1); diff --git a/src/transport/middle_proxy/reader.rs b/src/transport/middle_proxy/reader.rs index 0f8880d..e1e919f 100644 --- a/src/transport/middle_proxy/reader.rs +++ b/src/transport/middle_proxy/reader.rs @@ -512,9 +512,18 @@ mod tests { assert!(is_data_route_queue_full(RouteResult::QueueFullHigh)); assert!(!is_data_route_queue_full(RouteResult::NoConn)); assert!(!should_close_on_queue_full_streak(1, PressureState::Normal)); - assert!(!should_close_on_queue_full_streak(2, PressureState::Pressured)); - assert!(!should_close_on_queue_full_streak(3, PressureState::Pressured)); - assert!(should_close_on_queue_full_streak(3, PressureState::Shedding)); + assert!(!should_close_on_queue_full_streak( + 2, + PressureState::Pressured + )); + assert!(!should_close_on_queue_full_streak( + 3, + PressureState::Pressured + )); + assert!(should_close_on_queue_full_streak( + 3, + PressureState::Shedding + )); assert!(should_close_on_queue_full_streak( u8::MAX, PressureState::Saturated diff --git a/src/transport/middle_proxy/tests/fairness_security_tests.rs b/src/transport/middle_proxy/tests/fairness_security_tests.rs index eae3237..b32493b 100644 --- a/src/transport/middle_proxy/tests/fairness_security_tests.rs +++ b/src/transport/middle_proxy/tests/fairness_security_tests.rs @@ -121,7 +121,7 @@ fn fairness_prioritizes_quickack_flow_when_weights_enabled() { let mut fairness = WorkerFairnessState::new( WorkerFairnessConfig { max_total_queued_bytes: 256 * 1024, - max_flow_queued_bytes: 64 * 1024, + max_flow_queued_bytes: 128 * 1024, base_quantum_bytes: 8 * 1024, pressured_quantum_bytes: 8 * 1024, penalized_quantum_bytes: 8 * 1024, @@ -185,7 +185,7 @@ fn fairness_pressure_hysteresis_prevents_instant_flapping() { let mut fairness = WorkerFairnessState::new(cfg, now); - for _ in 0..4 { + for _ in 0..3 { assert_eq!( fairness.enqueue_data(9, 0, enqueue_payload(900), now), AdmissionDecision::Admit