From 2d5cd9c8e12a4f5cb098cf4c31a24896c51e2eef Mon Sep 17 00:00:00 2001 From: astronaut808 <38975427+astronaut808@users.noreply.github.com> Date: Sat, 18 Apr 2026 02:40:32 +0500 Subject: [PATCH] Improve ME downstream retries for queued fairness backlog --- src/transport/middle_proxy/fairness/mod.rs | 1 - src/transport/middle_proxy/reader.rs | 93 +++++++++++++++++----- 2 files changed, 74 insertions(+), 20 deletions(-) diff --git a/src/transport/middle_proxy/fairness/mod.rs b/src/transport/middle_proxy/fairness/mod.rs index 58eb890..dc9898d 100644 --- a/src/transport/middle_proxy/fairness/mod.rs +++ b/src/transport/middle_proxy/fairness/mod.rs @@ -7,7 +7,6 @@ mod model; mod pressure; mod scheduler; -#[cfg(test)] pub(crate) use model::PressureState; pub(crate) use model::{AdmissionDecision, DispatchAction, DispatchFeedback, SchedulerDecision}; pub(crate) use scheduler::{WorkerFairnessConfig, WorkerFairnessSnapshot, WorkerFairnessState}; diff --git a/src/transport/middle_proxy/reader.rs b/src/transport/middle_proxy/reader.rs index 8041185..0f8880d 100644 --- a/src/transport/middle_proxy/reader.rs +++ b/src/transport/middle_proxy/reader.rs @@ -4,7 +4,7 @@ use std::collections::HashMap; use std::io::ErrorKind; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}; -use std::time::Instant; +use std::time::{Duration, Instant}; use bytes::{Bytes, BytesMut}; use tokio::io::AsyncReadExt; @@ -21,8 +21,8 @@ use crate::stats::Stats; use super::codec::{RpcChecksumMode, WriterCommand, rpc_crc}; use super::fairness::{ - AdmissionDecision, DispatchAction, DispatchFeedback, SchedulerDecision, WorkerFairnessConfig, - WorkerFairnessSnapshot, WorkerFairnessState, + AdmissionDecision, DispatchAction, DispatchFeedback, PressureState, SchedulerDecision, + WorkerFairnessConfig, WorkerFairnessSnapshot, WorkerFairnessState, }; use super::registry::RouteResult; use super::{ConnRegistry, MeResponse}; @@ -45,10 +45,22 @@ fn is_data_route_queue_full(result: RouteResult) -> bool { ) } -fn should_close_on_queue_full_streak(streak: u8) -> bool { +fn should_close_on_queue_full_streak(streak: u8, pressure_state: PressureState) -> bool { + if pressure_state < PressureState::Shedding { + return false; + } + streak >= DATA_ROUTE_QUEUE_FULL_STARVATION_THRESHOLD } +fn should_schedule_fairness_retry(snapshot: &WorkerFairnessSnapshot) -> bool { + snapshot.total_queued_bytes > 0 +} + +fn fairness_retry_delay(route_wait_ms: u64) -> Duration { + Duration::from_millis(route_wait_ms.max(1)) +} + async fn route_data_with_retry( reg: &ConnRegistry, conn_id: u64, @@ -157,7 +169,7 @@ async fn drain_fairness_scheduler( break; }; let cid = candidate.frame.conn_id; - let _pressure_state = candidate.pressure_state; + let pressure_state = candidate.pressure_state; let _flow_class = candidate.flow_class; let routed = route_data_with_retry( reg, @@ -176,7 +188,7 @@ async fn drain_fairness_scheduler( if is_data_route_queue_full(routed) { let streak = data_route_queue_full_streak.entry(cid).or_insert(0); *streak = streak.saturating_add(1); - if should_close_on_queue_full_streak(*streak) { + if should_close_on_queue_full_streak(*streak, pressure_state) { fairness.remove_flow(cid); data_route_queue_full_streak.remove(&cid); reg.unregister(cid).await; @@ -231,10 +243,33 @@ pub(crate) async fn reader_loop( let mut fairness_snapshot = fairness.snapshot(); loop { let mut tmp = [0u8; 65_536]; + let backlog_retry_enabled = should_schedule_fairness_retry(&fairness_snapshot); + let backlog_retry_delay = + fairness_retry_delay(reader_route_data_wait_ms.load(Ordering::Relaxed)); + let mut retry_only = false; let n = tokio::select! { res = rd.read(&mut tmp) => res.map_err(ProxyError::Io)?, + _ = tokio::time::sleep(backlog_retry_delay), if backlog_retry_enabled => { + retry_only = true; + 0usize + }, _ = cancel.cancelled() => return Ok(()), }; + if retry_only { + let route_wait_ms = reader_route_data_wait_ms.load(Ordering::Relaxed); + drain_fairness_scheduler( + &mut fairness, + reg.as_ref(), + &tx, + &mut data_route_queue_full_streak, + route_wait_ms, + stats.as_ref(), + ) + .await; + let current_snapshot = fairness.snapshot(); + apply_fairness_metrics_delta(stats.as_ref(), &mut fairness_snapshot, current_snapshot); + continue; + } if n == 0 { stats.increment_me_reader_eof_total(); return Err(ProxyError::Io(std::io::Error::new( @@ -317,12 +352,9 @@ pub(crate) async fn reader_loop( stats.increment_me_route_drop_queue_full_high(); let streak = data_route_queue_full_streak.entry(cid).or_insert(0); *streak = streak.saturating_add(1); - if should_close_on_queue_full_streak(*streak) - || matches!( - admission, - AdmissionDecision::RejectSaturated - | AdmissionDecision::RejectStandingFlow - ) + let pressure_state = fairness.pressure_state(); + if should_close_on_queue_full_streak(*streak, pressure_state) + || matches!(admission, AdmissionDecision::RejectSaturated) { fairness.remove_flow(cid); data_route_queue_full_streak.remove(&cid); @@ -445,14 +477,18 @@ pub(crate) async fn reader_loop( #[cfg(test)] mod tests { + use std::time::Duration; + use bytes::Bytes; + use super::PressureState; use crate::transport::middle_proxy::ConnRegistry; use super::{ - MeResponse, RouteResult, is_data_route_queue_full, route_data_with_retry, - should_close_on_queue_full_streak, should_close_on_route_result_for_ack, - should_close_on_route_result_for_data, + MeResponse, RouteResult, WorkerFairnessSnapshot, fairness_retry_delay, + is_data_route_queue_full, route_data_with_retry, should_close_on_queue_full_streak, + should_close_on_route_result_for_ack, should_close_on_route_result_for_data, + should_schedule_fairness_retry, }; #[test] @@ -475,10 +511,29 @@ mod tests { assert!(is_data_route_queue_full(RouteResult::QueueFullBase)); assert!(is_data_route_queue_full(RouteResult::QueueFullHigh)); assert!(!is_data_route_queue_full(RouteResult::NoConn)); - assert!(!should_close_on_queue_full_streak(1)); - assert!(!should_close_on_queue_full_streak(2)); - assert!(should_close_on_queue_full_streak(3)); - assert!(should_close_on_queue_full_streak(u8::MAX)); + assert!(!should_close_on_queue_full_streak(1, PressureState::Normal)); + assert!(!should_close_on_queue_full_streak(2, PressureState::Pressured)); + assert!(!should_close_on_queue_full_streak(3, PressureState::Pressured)); + assert!(should_close_on_queue_full_streak(3, PressureState::Shedding)); + assert!(should_close_on_queue_full_streak( + u8::MAX, + PressureState::Saturated + )); + } + + #[test] + fn fairness_retry_is_scheduled_only_when_queue_has_pending_bytes() { + let mut snapshot = WorkerFairnessSnapshot::default(); + assert!(!should_schedule_fairness_retry(&snapshot)); + + snapshot.total_queued_bytes = 1; + assert!(should_schedule_fairness_retry(&snapshot)); + } + + #[test] + fn fairness_retry_delay_never_drops_below_one_millisecond() { + assert_eq!(fairness_retry_delay(0), Duration::from_millis(1)); + assert_eq!(fairness_retry_delay(2), Duration::from_millis(2)); } #[test]