mirror of
https://github.com/telemt/telemt.git
synced 2026-04-20 03:54:09 +03:00
Fix in Fairness tests
This commit is contained in:
@@ -149,7 +149,9 @@ impl PressureEvaluator {
|
|||||||
let mut pressured = false;
|
let mut pressured = false;
|
||||||
let mut saturated = false;
|
let mut saturated = false;
|
||||||
|
|
||||||
let queue_saturated_pct = cfg.queue_ratio_shedding_pct.min(cfg.queue_ratio_saturated_pct);
|
let queue_saturated_pct = cfg
|
||||||
|
.queue_ratio_shedding_pct
|
||||||
|
.min(cfg.queue_ratio_saturated_pct);
|
||||||
if queue_ratio_pct >= cfg.queue_ratio_pressured_pct {
|
if queue_ratio_pct >= cfg.queue_ratio_pressured_pct {
|
||||||
pressured = true;
|
pressured = true;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,8 +1,8 @@
|
|||||||
use std::collections::{HashMap, HashSet, VecDeque};
|
use std::collections::{HashMap, HashSet, VecDeque};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use bytes::Bytes;
|
|
||||||
use crate::protocol::constants::RPC_FLAG_QUICKACK;
|
use crate::protocol::constants::RPC_FLAG_QUICKACK;
|
||||||
|
use bytes::Bytes;
|
||||||
|
|
||||||
use super::model::{
|
use super::model::{
|
||||||
AdmissionDecision, DispatchAction, DispatchCandidate, DispatchFeedback, FlowFairnessState,
|
AdmissionDecision, DispatchAction, DispatchCandidate, DispatchFeedback, FlowFairnessState,
|
||||||
@@ -310,11 +310,8 @@ impl WorkerFairnessState {
|
|||||||
} else {
|
} else {
|
||||||
Self::classify_flow(&self.config, pressure_state, now, &mut flow.fairness);
|
Self::classify_flow(&self.config, pressure_state, now, &mut flow.fairness);
|
||||||
|
|
||||||
let quantum = Self::effective_quantum_bytes(
|
let quantum =
|
||||||
&self.config,
|
Self::effective_quantum_bytes(&self.config, pressure_state, &flow.fairness);
|
||||||
pressure_state,
|
|
||||||
&flow.fairness,
|
|
||||||
);
|
|
||||||
flow.fairness.deficit_bytes = flow
|
flow.fairness.deficit_bytes = flow
|
||||||
.fairness
|
.fairness
|
||||||
.deficit_bytes
|
.deficit_bytes
|
||||||
@@ -507,7 +504,8 @@ impl WorkerFairnessState {
|
|||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
self.active_ring_members.remove(&conn_id);
|
self.active_ring_members.remove(&conn_id);
|
||||||
self.active_ring.retain(|queued_conn_id| *queued_conn_id != conn_id);
|
self.active_ring
|
||||||
|
.retain(|queued_conn_id| *queued_conn_id != conn_id);
|
||||||
let (was_standing, was_backpressured) = Self::flow_membership(&entry.fairness);
|
let (was_standing, was_backpressured) = Self::flow_membership(&entry.fairness);
|
||||||
if was_standing {
|
if was_standing {
|
||||||
self.standing_flow_count = self.standing_flow_count.saturating_sub(1);
|
self.standing_flow_count = self.standing_flow_count.saturating_sub(1);
|
||||||
|
|||||||
@@ -512,9 +512,18 @@ mod tests {
|
|||||||
assert!(is_data_route_queue_full(RouteResult::QueueFullHigh));
|
assert!(is_data_route_queue_full(RouteResult::QueueFullHigh));
|
||||||
assert!(!is_data_route_queue_full(RouteResult::NoConn));
|
assert!(!is_data_route_queue_full(RouteResult::NoConn));
|
||||||
assert!(!should_close_on_queue_full_streak(1, PressureState::Normal));
|
assert!(!should_close_on_queue_full_streak(1, PressureState::Normal));
|
||||||
assert!(!should_close_on_queue_full_streak(2, PressureState::Pressured));
|
assert!(!should_close_on_queue_full_streak(
|
||||||
assert!(!should_close_on_queue_full_streak(3, PressureState::Pressured));
|
2,
|
||||||
assert!(should_close_on_queue_full_streak(3, PressureState::Shedding));
|
PressureState::Pressured
|
||||||
|
));
|
||||||
|
assert!(!should_close_on_queue_full_streak(
|
||||||
|
3,
|
||||||
|
PressureState::Pressured
|
||||||
|
));
|
||||||
|
assert!(should_close_on_queue_full_streak(
|
||||||
|
3,
|
||||||
|
PressureState::Shedding
|
||||||
|
));
|
||||||
assert!(should_close_on_queue_full_streak(
|
assert!(should_close_on_queue_full_streak(
|
||||||
u8::MAX,
|
u8::MAX,
|
||||||
PressureState::Saturated
|
PressureState::Saturated
|
||||||
|
|||||||
@@ -121,7 +121,7 @@ fn fairness_prioritizes_quickack_flow_when_weights_enabled() {
|
|||||||
let mut fairness = WorkerFairnessState::new(
|
let mut fairness = WorkerFairnessState::new(
|
||||||
WorkerFairnessConfig {
|
WorkerFairnessConfig {
|
||||||
max_total_queued_bytes: 256 * 1024,
|
max_total_queued_bytes: 256 * 1024,
|
||||||
max_flow_queued_bytes: 64 * 1024,
|
max_flow_queued_bytes: 128 * 1024,
|
||||||
base_quantum_bytes: 8 * 1024,
|
base_quantum_bytes: 8 * 1024,
|
||||||
pressured_quantum_bytes: 8 * 1024,
|
pressured_quantum_bytes: 8 * 1024,
|
||||||
penalized_quantum_bytes: 8 * 1024,
|
penalized_quantum_bytes: 8 * 1024,
|
||||||
@@ -185,7 +185,7 @@ fn fairness_pressure_hysteresis_prevents_instant_flapping() {
|
|||||||
|
|
||||||
let mut fairness = WorkerFairnessState::new(cfg, now);
|
let mut fairness = WorkerFairnessState::new(cfg, now);
|
||||||
|
|
||||||
for _ in 0..4 {
|
for _ in 0..3 {
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
fairness.enqueue_data(9, 0, enqueue_payload(900), now),
|
fairness.enqueue_data(9, 0, enqueue_payload(900), now),
|
||||||
AdmissionDecision::Admit
|
AdmissionDecision::Admit
|
||||||
|
|||||||
Reference in New Issue
Block a user