Improve ME downstream retries for queued fairness backlog

This commit is contained in:
astronaut808
2026-04-18 02:40:32 +05:00
parent 1b25bada29
commit 2d5cd9c8e1
2 changed files with 74 additions and 20 deletions

View File

@@ -7,7 +7,6 @@ mod model;
mod pressure;
mod scheduler;
#[cfg(test)]
pub(crate) use model::PressureState;
pub(crate) use model::{AdmissionDecision, DispatchAction, DispatchFeedback, SchedulerDecision};
pub(crate) use scheduler::{WorkerFairnessConfig, WorkerFairnessSnapshot, WorkerFairnessState};

View File

@@ -4,7 +4,7 @@ use std::collections::HashMap;
use std::io::ErrorKind;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use std::time::Instant;
use std::time::{Duration, Instant};
use bytes::{Bytes, BytesMut};
use tokio::io::AsyncReadExt;
@@ -21,8 +21,8 @@ use crate::stats::Stats;
use super::codec::{RpcChecksumMode, WriterCommand, rpc_crc};
use super::fairness::{
AdmissionDecision, DispatchAction, DispatchFeedback, SchedulerDecision, WorkerFairnessConfig,
WorkerFairnessSnapshot, WorkerFairnessState,
AdmissionDecision, DispatchAction, DispatchFeedback, PressureState, SchedulerDecision,
WorkerFairnessConfig, WorkerFairnessSnapshot, WorkerFairnessState,
};
use super::registry::RouteResult;
use super::{ConnRegistry, MeResponse};
@@ -45,10 +45,22 @@ fn is_data_route_queue_full(result: RouteResult) -> bool {
)
}
fn should_close_on_queue_full_streak(streak: u8) -> bool {
fn should_close_on_queue_full_streak(streak: u8, pressure_state: PressureState) -> bool {
if pressure_state < PressureState::Shedding {
return false;
}
streak >= DATA_ROUTE_QUEUE_FULL_STARVATION_THRESHOLD
}
fn should_schedule_fairness_retry(snapshot: &WorkerFairnessSnapshot) -> bool {
snapshot.total_queued_bytes > 0
}
fn fairness_retry_delay(route_wait_ms: u64) -> Duration {
Duration::from_millis(route_wait_ms.max(1))
}
async fn route_data_with_retry(
reg: &ConnRegistry,
conn_id: u64,
@@ -157,7 +169,7 @@ async fn drain_fairness_scheduler(
break;
};
let cid = candidate.frame.conn_id;
let _pressure_state = candidate.pressure_state;
let pressure_state = candidate.pressure_state;
let _flow_class = candidate.flow_class;
let routed = route_data_with_retry(
reg,
@@ -176,7 +188,7 @@ async fn drain_fairness_scheduler(
if is_data_route_queue_full(routed) {
let streak = data_route_queue_full_streak.entry(cid).or_insert(0);
*streak = streak.saturating_add(1);
if should_close_on_queue_full_streak(*streak) {
if should_close_on_queue_full_streak(*streak, pressure_state) {
fairness.remove_flow(cid);
data_route_queue_full_streak.remove(&cid);
reg.unregister(cid).await;
@@ -231,10 +243,33 @@ pub(crate) async fn reader_loop(
let mut fairness_snapshot = fairness.snapshot();
loop {
let mut tmp = [0u8; 65_536];
let backlog_retry_enabled = should_schedule_fairness_retry(&fairness_snapshot);
let backlog_retry_delay =
fairness_retry_delay(reader_route_data_wait_ms.load(Ordering::Relaxed));
let mut retry_only = false;
let n = tokio::select! {
res = rd.read(&mut tmp) => res.map_err(ProxyError::Io)?,
_ = tokio::time::sleep(backlog_retry_delay), if backlog_retry_enabled => {
retry_only = true;
0usize
},
_ = cancel.cancelled() => return Ok(()),
};
if retry_only {
let route_wait_ms = reader_route_data_wait_ms.load(Ordering::Relaxed);
drain_fairness_scheduler(
&mut fairness,
reg.as_ref(),
&tx,
&mut data_route_queue_full_streak,
route_wait_ms,
stats.as_ref(),
)
.await;
let current_snapshot = fairness.snapshot();
apply_fairness_metrics_delta(stats.as_ref(), &mut fairness_snapshot, current_snapshot);
continue;
}
if n == 0 {
stats.increment_me_reader_eof_total();
return Err(ProxyError::Io(std::io::Error::new(
@@ -317,12 +352,9 @@ pub(crate) async fn reader_loop(
stats.increment_me_route_drop_queue_full_high();
let streak = data_route_queue_full_streak.entry(cid).or_insert(0);
*streak = streak.saturating_add(1);
if should_close_on_queue_full_streak(*streak)
|| matches!(
admission,
AdmissionDecision::RejectSaturated
| AdmissionDecision::RejectStandingFlow
)
let pressure_state = fairness.pressure_state();
if should_close_on_queue_full_streak(*streak, pressure_state)
|| matches!(admission, AdmissionDecision::RejectSaturated)
{
fairness.remove_flow(cid);
data_route_queue_full_streak.remove(&cid);
@@ -445,14 +477,18 @@ pub(crate) async fn reader_loop(
#[cfg(test)]
mod tests {
use std::time::Duration;
use bytes::Bytes;
use super::PressureState;
use crate::transport::middle_proxy::ConnRegistry;
use super::{
MeResponse, RouteResult, is_data_route_queue_full, route_data_with_retry,
should_close_on_queue_full_streak, should_close_on_route_result_for_ack,
should_close_on_route_result_for_data,
MeResponse, RouteResult, WorkerFairnessSnapshot, fairness_retry_delay,
is_data_route_queue_full, route_data_with_retry, should_close_on_queue_full_streak,
should_close_on_route_result_for_ack, should_close_on_route_result_for_data,
should_schedule_fairness_retry,
};
#[test]
@@ -475,10 +511,29 @@ mod tests {
assert!(is_data_route_queue_full(RouteResult::QueueFullBase));
assert!(is_data_route_queue_full(RouteResult::QueueFullHigh));
assert!(!is_data_route_queue_full(RouteResult::NoConn));
assert!(!should_close_on_queue_full_streak(1));
assert!(!should_close_on_queue_full_streak(2));
assert!(should_close_on_queue_full_streak(3));
assert!(should_close_on_queue_full_streak(u8::MAX));
assert!(!should_close_on_queue_full_streak(1, PressureState::Normal));
assert!(!should_close_on_queue_full_streak(2, PressureState::Pressured));
assert!(!should_close_on_queue_full_streak(3, PressureState::Pressured));
assert!(should_close_on_queue_full_streak(3, PressureState::Shedding));
assert!(should_close_on_queue_full_streak(
u8::MAX,
PressureState::Saturated
));
}
#[test]
fn fairness_retry_is_scheduled_only_when_queue_has_pending_bytes() {
let mut snapshot = WorkerFairnessSnapshot::default();
assert!(!should_schedule_fairness_retry(&snapshot));
snapshot.total_queued_bytes = 1;
assert!(should_schedule_fairness_retry(&snapshot));
}
#[test]
fn fairness_retry_delay_never_drops_below_one_millisecond() {
assert_eq!(fairness_retry_delay(0), Duration::from_millis(1));
assert_eq!(fairness_retry_delay(2), Duration::from_millis(2));
}
#[test]