Fairness Regression fixes

This commit is contained in:
Alexey
2026-04-21 01:11:43 +03:00
parent aace0129f8
commit 926e3aa987
17 changed files with 244 additions and 71 deletions

View File

@@ -21,6 +21,8 @@ const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_ACTIVE_WRITERS_PER_CORE: u16 = 64;
const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_WARM_WRITERS_PER_CORE: u16 = 64; const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_WARM_WRITERS_PER_CORE: u16 = 64;
const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_ACTIVE_WRITERS_GLOBAL: u32 = 256; const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_ACTIVE_WRITERS_GLOBAL: u32 = 256;
const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_WARM_WRITERS_GLOBAL: u32 = 256; const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_WARM_WRITERS_GLOBAL: u32 = 256;
const DEFAULT_ME_ROUTE_BACKPRESSURE_ENABLED: bool = false;
const DEFAULT_ME_ROUTE_FAIRSHARE_ENABLED: bool = false;
const DEFAULT_ME_WRITER_CMD_CHANNEL_CAPACITY: usize = 4096; const DEFAULT_ME_WRITER_CMD_CHANNEL_CAPACITY: usize = 4096;
const DEFAULT_ME_ROUTE_CHANNEL_CAPACITY: usize = 768; const DEFAULT_ME_ROUTE_CHANNEL_CAPACITY: usize = 768;
const DEFAULT_ME_C2ME_CHANNEL_CAPACITY: usize = 1024; const DEFAULT_ME_C2ME_CHANNEL_CAPACITY: usize = 1024;
@@ -529,6 +531,14 @@ pub(crate) fn default_me_route_backpressure_base_timeout_ms() -> u64 {
25 25
} }
pub(crate) fn default_me_route_backpressure_enabled() -> bool {
DEFAULT_ME_ROUTE_BACKPRESSURE_ENABLED
}
pub(crate) fn default_me_route_fairshare_enabled() -> bool {
DEFAULT_ME_ROUTE_FAIRSHARE_ENABLED
}
pub(crate) fn default_me_route_backpressure_high_timeout_ms() -> u64 { pub(crate) fn default_me_route_backpressure_high_timeout_ms() -> u64 {
120 120
} }

View File

@@ -86,6 +86,8 @@ pub struct HotFields {
pub telemetry_user_enabled: bool, pub telemetry_user_enabled: bool,
pub telemetry_me_level: MeTelemetryLevel, pub telemetry_me_level: MeTelemetryLevel,
pub me_socks_kdf_policy: MeSocksKdfPolicy, pub me_socks_kdf_policy: MeSocksKdfPolicy,
pub me_route_backpressure_enabled: bool,
pub me_route_fairshare_enabled: bool,
pub me_floor_mode: MeFloorMode, pub me_floor_mode: MeFloorMode,
pub me_adaptive_floor_idle_secs: u64, pub me_adaptive_floor_idle_secs: u64,
pub me_adaptive_floor_min_writers_single_endpoint: u8, pub me_adaptive_floor_min_writers_single_endpoint: u8,
@@ -187,6 +189,8 @@ impl HotFields {
telemetry_user_enabled: cfg.general.telemetry.user_enabled, telemetry_user_enabled: cfg.general.telemetry.user_enabled,
telemetry_me_level: cfg.general.telemetry.me_level, telemetry_me_level: cfg.general.telemetry.me_level,
me_socks_kdf_policy: cfg.general.me_socks_kdf_policy, me_socks_kdf_policy: cfg.general.me_socks_kdf_policy,
me_route_backpressure_enabled: cfg.general.me_route_backpressure_enabled,
me_route_fairshare_enabled: cfg.general.me_route_fairshare_enabled,
me_floor_mode: cfg.general.me_floor_mode, me_floor_mode: cfg.general.me_floor_mode,
me_adaptive_floor_idle_secs: cfg.general.me_adaptive_floor_idle_secs, me_adaptive_floor_idle_secs: cfg.general.me_adaptive_floor_idle_secs,
me_adaptive_floor_min_writers_single_endpoint: cfg me_adaptive_floor_min_writers_single_endpoint: cfg
@@ -529,6 +533,8 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig {
new.general.me_route_backpressure_high_timeout_ms; new.general.me_route_backpressure_high_timeout_ms;
cfg.general.me_route_backpressure_high_watermark_pct = cfg.general.me_route_backpressure_high_watermark_pct =
new.general.me_route_backpressure_high_watermark_pct; new.general.me_route_backpressure_high_watermark_pct;
cfg.general.me_route_backpressure_enabled = new.general.me_route_backpressure_enabled;
cfg.general.me_route_fairshare_enabled = new.general.me_route_fairshare_enabled;
cfg.general.me_reader_route_data_wait_ms = new.general.me_reader_route_data_wait_ms; cfg.general.me_reader_route_data_wait_ms = new.general.me_reader_route_data_wait_ms;
cfg.general.me_d2c_flush_batch_max_frames = new.general.me_d2c_flush_batch_max_frames; cfg.general.me_d2c_flush_batch_max_frames = new.general.me_d2c_flush_batch_max_frames;
cfg.general.me_d2c_flush_batch_max_bytes = new.general.me_d2c_flush_batch_max_bytes; cfg.general.me_d2c_flush_batch_max_bytes = new.general.me_d2c_flush_batch_max_bytes;
@@ -1053,6 +1059,8 @@ fn log_changes(
!= new_hot.me_route_backpressure_high_timeout_ms != new_hot.me_route_backpressure_high_timeout_ms
|| old_hot.me_route_backpressure_high_watermark_pct || old_hot.me_route_backpressure_high_watermark_pct
!= new_hot.me_route_backpressure_high_watermark_pct != new_hot.me_route_backpressure_high_watermark_pct
|| old_hot.me_route_backpressure_enabled != new_hot.me_route_backpressure_enabled
|| old_hot.me_route_fairshare_enabled != new_hot.me_route_fairshare_enabled
|| old_hot.me_reader_route_data_wait_ms != new_hot.me_reader_route_data_wait_ms || old_hot.me_reader_route_data_wait_ms != new_hot.me_reader_route_data_wait_ms
|| old_hot.me_health_interval_ms_unhealthy != new_hot.me_health_interval_ms_unhealthy || old_hot.me_health_interval_ms_unhealthy != new_hot.me_health_interval_ms_unhealthy
|| old_hot.me_health_interval_ms_healthy != new_hot.me_health_interval_ms_healthy || old_hot.me_health_interval_ms_healthy != new_hot.me_health_interval_ms_healthy
@@ -1060,10 +1068,12 @@ fn log_changes(
|| old_hot.me_warn_rate_limit_ms != new_hot.me_warn_rate_limit_ms || old_hot.me_warn_rate_limit_ms != new_hot.me_warn_rate_limit_ms
{ {
info!( info!(
"config reload: me_route_backpressure: base={}ms high={}ms watermark={}%; me_reader_route_data_wait_ms={}; me_health_interval: unhealthy={}ms healthy={}ms; me_admission_poll={}ms; me_warn_rate_limit={}ms", "config reload: me_route_backpressure: enabled={} base={}ms high={}ms watermark={}%; me_route_fairshare_enabled={}; me_reader_route_data_wait_ms={}; me_health_interval: unhealthy={}ms healthy={}ms; me_admission_poll={}ms; me_warn_rate_limit={}ms",
new_hot.me_route_backpressure_enabled,
new_hot.me_route_backpressure_base_timeout_ms, new_hot.me_route_backpressure_base_timeout_ms,
new_hot.me_route_backpressure_high_timeout_ms, new_hot.me_route_backpressure_high_timeout_ms,
new_hot.me_route_backpressure_high_watermark_pct, new_hot.me_route_backpressure_high_watermark_pct,
new_hot.me_route_fairshare_enabled,
new_hot.me_reader_route_data_wait_ms, new_hot.me_reader_route_data_wait_ms,
new_hot.me_health_interval_ms_unhealthy, new_hot.me_health_interval_ms_unhealthy,
new_hot.me_health_interval_ms_healthy, new_hot.me_health_interval_ms_healthy,

View File

@@ -729,6 +729,14 @@ pub struct GeneralConfig {
#[serde(default)] #[serde(default)]
pub me_socks_kdf_policy: MeSocksKdfPolicy, pub me_socks_kdf_policy: MeSocksKdfPolicy,
/// Enable route-level ME backpressure controls in reader fairness path.
#[serde(default = "default_me_route_backpressure_enabled")]
pub me_route_backpressure_enabled: bool,
/// Enable worker-local fairshare scheduler for ME reader routing.
#[serde(default = "default_me_route_fairshare_enabled")]
pub me_route_fairshare_enabled: bool,
/// Base backpressure timeout in milliseconds for ME route channel send. /// Base backpressure timeout in milliseconds for ME route channel send.
#[serde(default = "default_me_route_backpressure_base_timeout_ms")] #[serde(default = "default_me_route_backpressure_base_timeout_ms")]
pub me_route_backpressure_base_timeout_ms: u64, pub me_route_backpressure_base_timeout_ms: u64,
@@ -1059,6 +1067,8 @@ impl Default for GeneralConfig {
disable_colors: false, disable_colors: false,
telemetry: TelemetryConfig::default(), telemetry: TelemetryConfig::default(),
me_socks_kdf_policy: MeSocksKdfPolicy::Strict, me_socks_kdf_policy: MeSocksKdfPolicy::Strict,
me_route_backpressure_enabled: default_me_route_backpressure_enabled(),
me_route_fairshare_enabled: default_me_route_fairshare_enabled(),
me_route_backpressure_base_timeout_ms: default_me_route_backpressure_base_timeout_ms(), me_route_backpressure_base_timeout_ms: default_me_route_backpressure_base_timeout_ms(),
me_route_backpressure_high_timeout_ms: default_me_route_backpressure_high_timeout_ms(), me_route_backpressure_high_timeout_ms: default_me_route_backpressure_high_timeout_ms(),
me_route_backpressure_high_watermark_pct: me_route_backpressure_high_watermark_pct:

View File

@@ -277,6 +277,8 @@ pub(crate) async fn initialize_me_pool(
config.general.me_socks_kdf_policy, config.general.me_socks_kdf_policy,
config.general.me_writer_cmd_channel_capacity, config.general.me_writer_cmd_channel_capacity,
config.general.me_route_channel_capacity, config.general.me_route_channel_capacity,
config.general.me_route_backpressure_enabled,
config.general.me_route_fairshare_enabled,
config.general.me_route_backpressure_base_timeout_ms, config.general.me_route_backpressure_base_timeout_ms,
config.general.me_route_backpressure_high_timeout_ms, config.general.me_route_backpressure_high_timeout_ms,
config.general.me_route_backpressure_high_watermark_pct, config.general.me_route_backpressure_high_watermark_pct,

View File

@@ -122,6 +122,8 @@ pub(crate) async fn spawn_runtime_tasks(
if let Some(pool) = &me_pool_for_policy { if let Some(pool) = &me_pool_for_policy {
pool.update_runtime_transport_policy( pool.update_runtime_transport_policy(
cfg.general.me_socks_kdf_policy, cfg.general.me_socks_kdf_policy,
cfg.general.me_route_backpressure_enabled,
cfg.general.me_route_fairshare_enabled,
cfg.general.me_route_backpressure_base_timeout_ms, cfg.general.me_route_backpressure_base_timeout_ms,
cfg.general.me_route_backpressure_high_timeout_ms, cfg.general.me_route_backpressure_high_timeout_ms,
cfg.general.me_route_backpressure_high_watermark_pct, cfg.general.me_route_backpressure_high_watermark_pct,

View File

@@ -12,6 +12,7 @@ pub(crate) struct PressureSignals {
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub(crate) struct PressureConfig { pub(crate) struct PressureConfig {
pub(crate) backpressure_enabled: bool,
pub(crate) evaluate_every_rounds: u32, pub(crate) evaluate_every_rounds: u32,
pub(crate) transition_hysteresis_rounds: u8, pub(crate) transition_hysteresis_rounds: u8,
pub(crate) standing_ratio_pressured_pct: u8, pub(crate) standing_ratio_pressured_pct: u8,
@@ -32,6 +33,7 @@ pub(crate) struct PressureConfig {
impl Default for PressureConfig { impl Default for PressureConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
backpressure_enabled: true,
evaluate_every_rounds: 8, evaluate_every_rounds: 8,
transition_hysteresis_rounds: 3, transition_hysteresis_rounds: 3,
standing_ratio_pressured_pct: 20, standing_ratio_pressured_pct: 20,
@@ -99,6 +101,13 @@ impl PressureEvaluator {
force: bool, force: bool,
) -> PressureState { ) -> PressureState {
self.rotate_window_if_needed(now, cfg); self.rotate_window_if_needed(now, cfg);
if !cfg.backpressure_enabled {
self.state = PressureState::Normal;
self.candidate_state = PressureState::Normal;
self.candidate_hits = 0;
self.rounds_since_eval = 0;
return self.state;
}
self.rounds_since_eval = self.rounds_since_eval.saturating_add(1); self.rounds_since_eval = self.rounds_since_eval.saturating_add(1);
if !force && self.rounds_since_eval < cfg.evaluate_every_rounds.max(1) { if !force && self.rounds_since_eval < cfg.evaluate_every_rounds.max(1) {
return self.state; return self.state;
@@ -133,6 +142,10 @@ impl PressureEvaluator {
max_total_queued_bytes: u64, max_total_queued_bytes: u64,
signals: PressureSignals, signals: PressureSignals,
) -> PressureState { ) -> PressureState {
if !cfg.backpressure_enabled {
return PressureState::Normal;
}
let queue_ratio_pct = if max_total_queued_bytes == 0 { let queue_ratio_pct = if max_total_queued_bytes == 0 {
100 100
} else { } else {
@@ -146,57 +159,59 @@ impl PressureEvaluator {
((signals.standing_flows.saturating_mul(100)) / signals.active_flows).min(100) as u8 ((signals.standing_flows.saturating_mul(100)) / signals.active_flows).min(100) as u8
}; };
let mut pressured = false; let mut pressure_score = 0u8;
let mut saturated = false;
let queue_saturated_pct = cfg
.queue_ratio_shedding_pct
.min(cfg.queue_ratio_saturated_pct);
if queue_ratio_pct >= cfg.queue_ratio_pressured_pct { if queue_ratio_pct >= cfg.queue_ratio_pressured_pct {
pressured = true; pressure_score = pressure_score.max(1);
} }
if queue_ratio_pct >= queue_saturated_pct { if queue_ratio_pct >= cfg.queue_ratio_shedding_pct {
saturated = true; pressure_score = pressure_score.max(2);
}
if queue_ratio_pct >= cfg.queue_ratio_saturated_pct {
pressure_score = pressure_score.max(3);
} }
let standing_saturated_pct = cfg
.standing_ratio_shedding_pct
.min(cfg.standing_ratio_saturated_pct);
if standing_ratio_pct >= cfg.standing_ratio_pressured_pct { if standing_ratio_pct >= cfg.standing_ratio_pressured_pct {
pressured = true; pressure_score = pressure_score.max(1);
} }
if standing_ratio_pct >= standing_saturated_pct { if standing_ratio_pct >= cfg.standing_ratio_shedding_pct {
saturated = true; pressure_score = pressure_score.max(2);
}
if standing_ratio_pct >= cfg.standing_ratio_saturated_pct {
pressure_score = pressure_score.max(3);
} }
let rejects_saturated = cfg.rejects_shedding.min(cfg.rejects_saturated);
if self.admission_rejects_window >= cfg.rejects_pressured { if self.admission_rejects_window >= cfg.rejects_pressured {
pressured = true; pressure_score = pressure_score.max(1);
} }
if self.admission_rejects_window >= rejects_saturated { if self.admission_rejects_window >= cfg.rejects_shedding {
saturated = true; pressure_score = pressure_score.max(2);
}
if self.admission_rejects_window >= cfg.rejects_saturated {
pressure_score = pressure_score.max(3);
} }
let stalls_saturated = cfg.stalls_shedding.min(cfg.stalls_saturated);
if self.route_stalls_window >= cfg.stalls_pressured { if self.route_stalls_window >= cfg.stalls_pressured {
pressured = true; pressure_score = pressure_score.max(1);
} }
if self.route_stalls_window >= stalls_saturated { if self.route_stalls_window >= cfg.stalls_shedding {
saturated = true; pressure_score = pressure_score.max(2);
}
if self.route_stalls_window >= cfg.stalls_saturated {
pressure_score = pressure_score.max(3);
} }
if signals.backpressured_flows > signals.active_flows.saturating_div(2) if signals.backpressured_flows > signals.active_flows.saturating_div(2)
&& signals.active_flows > 0 && signals.active_flows > 0
{ {
pressured = true; pressure_score = pressure_score.max(2);
} }
if saturated { match pressure_score {
PressureState::Saturated 0 => PressureState::Normal,
} else if pressured { 1 => PressureState::Pressured,
PressureState::Pressured 2 => PressureState::Shedding,
} else { _ => PressureState::Saturated,
PressureState::Normal
} }
} }

View File

@@ -14,6 +14,7 @@ use super::pressure::{PressureConfig, PressureEvaluator, PressureSignals};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub(crate) struct WorkerFairnessConfig { pub(crate) struct WorkerFairnessConfig {
pub(crate) worker_id: u16, pub(crate) worker_id: u16,
pub(crate) backpressure_enabled: bool,
pub(crate) max_active_flows: usize, pub(crate) max_active_flows: usize,
pub(crate) max_total_queued_bytes: u64, pub(crate) max_total_queued_bytes: u64,
pub(crate) max_flow_queued_bytes: u64, pub(crate) max_flow_queued_bytes: u64,
@@ -36,6 +37,7 @@ impl Default for WorkerFairnessConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
worker_id: 0, worker_id: 0,
backpressure_enabled: true,
max_active_flows: 4096, max_active_flows: 4096,
max_total_queued_bytes: 16 * 1024 * 1024, max_total_queued_bytes: 16 * 1024 * 1024,
max_flow_queued_bytes: 512 * 1024, max_flow_queued_bytes: 512 * 1024,
@@ -107,7 +109,8 @@ pub(crate) struct WorkerFairnessState {
} }
impl WorkerFairnessState { impl WorkerFairnessState {
pub(crate) fn new(config: WorkerFairnessConfig, now: Instant) -> Self { pub(crate) fn new(mut config: WorkerFairnessConfig, now: Instant) -> Self {
config.pressure.backpressure_enabled = config.backpressure_enabled;
let bucket_count = config.soft_bucket_count.max(1); let bucket_count = config.soft_bucket_count.max(1);
Self { Self {
config, config,
@@ -134,6 +137,12 @@ impl WorkerFairnessState {
self.pressure.state() self.pressure.state()
} }
pub(crate) fn set_backpressure_enabled(&mut self, enabled: bool) {
self.config.backpressure_enabled = enabled;
self.config.pressure.backpressure_enabled = enabled;
self.evaluate_pressure(Instant::now(), true);
}
pub(crate) fn snapshot(&self) -> WorkerFairnessSnapshot { pub(crate) fn snapshot(&self) -> WorkerFairnessSnapshot {
WorkerFairnessSnapshot { WorkerFairnessSnapshot {
pressure_state: self.pressure.state(), pressure_state: self.pressure.state(),
@@ -166,7 +175,7 @@ impl WorkerFairnessState {
}; };
let frame_bytes = frame.queued_bytes(); let frame_bytes = frame.queued_bytes();
if self.pressure.state() == PressureState::Saturated { if self.config.backpressure_enabled && self.pressure.state() == PressureState::Saturated {
self.pressure self.pressure
.note_admission_reject(now, &self.config.pressure); .note_admission_reject(now, &self.config.pressure);
self.enqueue_rejects = self.enqueue_rejects.saturating_add(1); self.enqueue_rejects = self.enqueue_rejects.saturating_add(1);
@@ -231,7 +240,8 @@ impl WorkerFairnessState {
return AdmissionDecision::RejectFlowCap; return AdmissionDecision::RejectFlowCap;
} }
if self.pressure.state() >= PressureState::Shedding if self.config.backpressure_enabled
&& self.pressure.state() >= PressureState::Shedding
&& entry.fairness.standing_state == StandingQueueState::Standing && entry.fairness.standing_state == StandingQueueState::Standing
{ {
self.pressure self.pressure
@@ -422,8 +432,10 @@ impl WorkerFairnessState {
DispatchAction::Continue DispatchAction::Continue
} }
DispatchFeedback::QueueFull => { DispatchFeedback::QueueFull => {
if self.config.backpressure_enabled {
self.pressure.note_route_stall(now, &self.config.pressure); self.pressure.note_route_stall(now, &self.config.pressure);
self.downstream_stalls = self.downstream_stalls.saturating_add(1); self.downstream_stalls = self.downstream_stalls.saturating_add(1);
}
let state = self.pressure.state(); let state = self.pressure.state();
let Some(flow) = self.flows.get_mut(&conn_id) else { let Some(flow) = self.flows.get_mut(&conn_id) else {
self.evaluate_pressure(now, true); self.evaluate_pressure(now, true);
@@ -433,16 +445,19 @@ impl WorkerFairnessState {
let before_membership = Self::flow_membership(&flow.fairness); let before_membership = Self::flow_membership(&flow.fairness);
let mut enqueue_active = false; let mut enqueue_active = false;
if self.config.backpressure_enabled {
flow.fairness.consecutive_stalls = flow.fairness.consecutive_stalls =
flow.fairness.consecutive_stalls.saturating_add(1); flow.fairness.consecutive_stalls.saturating_add(1);
flow.fairness.scheduler_state = FlowSchedulerState::Backpressured; flow.fairness.scheduler_state = FlowSchedulerState::Backpressured;
flow.fairness.pressure_class = FlowPressureClass::Backpressured; flow.fairness.pressure_class = FlowPressureClass::Backpressured;
}
let should_shed_frame = matches!(state, PressureState::Saturated) let should_shed_frame = self.config.backpressure_enabled
&& (matches!(state, PressureState::Saturated)
|| (matches!(state, PressureState::Shedding) || (matches!(state, PressureState::Shedding)
&& flow.fairness.standing_state == StandingQueueState::Standing && flow.fairness.standing_state == StandingQueueState::Standing
&& flow.fairness.consecutive_stalls && flow.fairness.consecutive_stalls
>= self.config.max_consecutive_stalls_before_shed); >= self.config.max_consecutive_stalls_before_shed));
if should_shed_frame { if should_shed_frame {
self.shed_drops = self.shed_drops.saturating_add(1); self.shed_drops = self.shed_drops.saturating_add(1);
@@ -467,7 +482,8 @@ impl WorkerFairnessState {
Self::classify_flow(&self.config, state, now, &mut flow.fairness); Self::classify_flow(&self.config, state, now, &mut flow.fairness);
let after_membership = Self::flow_membership(&flow.fairness); let after_membership = Self::flow_membership(&flow.fairness);
let should_close_flow = flow.fairness.consecutive_stalls let should_close_flow = self.config.backpressure_enabled
&& flow.fairness.consecutive_stalls
>= self.config.max_consecutive_stalls_before_close >= self.config.max_consecutive_stalls_before_close
&& self.pressure.state() == PressureState::Saturated; && self.pressure.state() == PressureState::Saturated;
( (

View File

@@ -1794,6 +1794,8 @@ mod tests {
MeSocksKdfPolicy::default(), MeSocksKdfPolicy::default(),
general.me_writer_cmd_channel_capacity, general.me_writer_cmd_channel_capacity,
general.me_route_channel_capacity, general.me_route_channel_capacity,
general.me_route_backpressure_enabled,
general.me_route_fairshare_enabled,
general.me_route_backpressure_base_timeout_ms, general.me_route_backpressure_base_timeout_ms,
general.me_route_backpressure_high_timeout_ms, general.me_route_backpressure_high_timeout_ms,
general.me_route_backpressure_high_watermark_pct, general.me_route_backpressure_high_watermark_pct,

View File

@@ -396,6 +396,8 @@ pub(super) struct WriterSelectionPolicyCore {
pub(super) struct TransportPolicyCore { pub(super) struct TransportPolicyCore {
pub(super) me_socks_kdf_policy: AtomicU8, pub(super) me_socks_kdf_policy: AtomicU8,
pub(super) me_route_backpressure_enabled: Arc<AtomicBool>,
pub(super) me_route_fairshare_enabled: Arc<AtomicBool>,
pub(super) me_reader_route_data_wait_ms: Arc<AtomicU64>, pub(super) me_reader_route_data_wait_ms: Arc<AtomicU64>,
} }
@@ -548,6 +550,8 @@ impl MePool {
me_socks_kdf_policy: MeSocksKdfPolicy, me_socks_kdf_policy: MeSocksKdfPolicy,
me_writer_cmd_channel_capacity: usize, me_writer_cmd_channel_capacity: usize,
me_route_channel_capacity: usize, me_route_channel_capacity: usize,
me_route_backpressure_enabled: bool,
me_route_fairshare_enabled: bool,
me_route_backpressure_base_timeout_ms: u64, me_route_backpressure_base_timeout_ms: u64,
me_route_backpressure_high_timeout_ms: u64, me_route_backpressure_high_timeout_ms: u64,
me_route_backpressure_high_watermark_pct: u8, me_route_backpressure_high_watermark_pct: u8,
@@ -783,6 +787,10 @@ impl MePool {
}), }),
transport_policy: Arc::new(TransportPolicyCore { transport_policy: Arc::new(TransportPolicyCore {
me_socks_kdf_policy: AtomicU8::new(me_socks_kdf_policy.as_u8()), me_socks_kdf_policy: AtomicU8::new(me_socks_kdf_policy.as_u8()),
me_route_backpressure_enabled: Arc::new(AtomicBool::new(
me_route_backpressure_enabled,
)),
me_route_fairshare_enabled: Arc::new(AtomicBool::new(me_route_fairshare_enabled)),
me_reader_route_data_wait_ms: Arc::new(AtomicU64::new( me_reader_route_data_wait_ms: Arc::new(AtomicU64::new(
me_reader_route_data_wait_ms, me_reader_route_data_wait_ms,
)), )),
@@ -1245,6 +1253,8 @@ impl MePool {
pub fn update_runtime_transport_policy( pub fn update_runtime_transport_policy(
&self, &self,
socks_kdf_policy: MeSocksKdfPolicy, socks_kdf_policy: MeSocksKdfPolicy,
route_backpressure_enabled: bool,
route_fairshare_enabled: bool,
route_backpressure_base_timeout_ms: u64, route_backpressure_base_timeout_ms: u64,
route_backpressure_high_timeout_ms: u64, route_backpressure_high_timeout_ms: u64,
route_backpressure_high_watermark_pct: u8, route_backpressure_high_watermark_pct: u8,
@@ -1253,6 +1263,12 @@ impl MePool {
self.transport_policy self.transport_policy
.me_socks_kdf_policy .me_socks_kdf_policy
.store(socks_kdf_policy.as_u8(), Ordering::Relaxed); .store(socks_kdf_policy.as_u8(), Ordering::Relaxed);
self.transport_policy
.me_route_backpressure_enabled
.store(route_backpressure_enabled, Ordering::Relaxed);
self.transport_policy
.me_route_fairshare_enabled
.store(route_fairshare_enabled, Ordering::Relaxed);
self.transport_policy self.transport_policy
.me_reader_route_data_wait_ms .me_reader_route_data_wait_ms
.store(reader_route_data_wait_ms, Ordering::Relaxed); .store(reader_route_data_wait_ms, Ordering::Relaxed);

View File

@@ -436,6 +436,11 @@ impl MePool {
let cancel_signal = cancel.clone(); let cancel_signal = cancel.clone();
let cancel_select = cancel.clone(); let cancel_select = cancel.clone();
let cancel_cleanup = cancel.clone(); let cancel_cleanup = cancel.clone();
let route_backpressure_enabled = self
.transport_policy
.me_route_backpressure_enabled
.clone();
let route_fairshare_enabled = self.transport_policy.me_route_fairshare_enabled.clone();
let reader_route_data_wait_ms = self.transport_policy.me_reader_route_data_wait_ms.clone(); let reader_route_data_wait_ms = self.transport_policy.me_reader_route_data_wait_ms.clone();
tokio::spawn(async move { tokio::spawn(async move {
@@ -458,6 +463,8 @@ impl MePool {
writer_id, writer_id,
degraded, degraded,
rtt_ema_ms_x10, rtt_ema_ms_x10,
route_backpressure_enabled,
route_fairshare_enabled,
reader_route_data_wait_ms, reader_route_data_wait_ms,
cancel_reader, cancel_reader,
) => WriterLifecycleExit::Reader(reader_res), ) => WriterLifecycleExit::Reader(reader_res),

View File

@@ -45,7 +45,15 @@ fn is_data_route_queue_full(result: RouteResult) -> bool {
) )
} }
fn should_close_on_queue_full_streak(streak: u8, pressure_state: PressureState) -> bool { fn should_close_on_queue_full_streak_with_policy(
streak: u8,
pressure_state: PressureState,
backpressure_enabled: bool,
) -> bool {
if !backpressure_enabled {
return false;
}
if pressure_state < PressureState::Shedding { if pressure_state < PressureState::Shedding {
return false; return false;
} }
@@ -160,6 +168,7 @@ async fn drain_fairness_scheduler(
reg: &ConnRegistry, reg: &ConnRegistry,
tx: &mpsc::Sender<WriterCommand>, tx: &mpsc::Sender<WriterCommand>,
data_route_queue_full_streak: &mut HashMap<u64, u8>, data_route_queue_full_streak: &mut HashMap<u64, u8>,
backpressure_enabled: bool,
route_wait_ms: u64, route_wait_ms: u64,
stats: &Stats, stats: &Stats,
) { ) {
@@ -188,7 +197,11 @@ async fn drain_fairness_scheduler(
if is_data_route_queue_full(routed) { if is_data_route_queue_full(routed) {
let streak = data_route_queue_full_streak.entry(cid).or_insert(0); let streak = data_route_queue_full_streak.entry(cid).or_insert(0);
*streak = streak.saturating_add(1); *streak = streak.saturating_add(1);
if should_close_on_queue_full_streak(*streak, pressure_state) { if should_close_on_queue_full_streak_with_policy(
*streak,
pressure_state,
backpressure_enabled,
) {
fairness.remove_flow(cid); fairness.remove_flow(cid);
data_route_queue_full_streak.remove(&cid); data_route_queue_full_streak.remove(&cid);
reg.unregister(cid).await; reg.unregister(cid).await;
@@ -220,6 +233,8 @@ pub(crate) async fn reader_loop(
writer_id: u64, writer_id: u64,
degraded: Arc<AtomicBool>, degraded: Arc<AtomicBool>,
writer_rtt_ema_ms_x10: Arc<AtomicU32>, writer_rtt_ema_ms_x10: Arc<AtomicU32>,
route_backpressure_enabled: Arc<AtomicBool>,
route_fairshare_enabled: Arc<AtomicBool>,
reader_route_data_wait_ms: Arc<AtomicU64>, reader_route_data_wait_ms: Arc<AtomicU64>,
cancel: CancellationToken, cancel: CancellationToken,
) -> Result<()> { ) -> Result<()> {
@@ -236,14 +251,20 @@ pub(crate) async fn reader_loop(
max_flow_queued_bytes: (reg.route_channel_capacity() as u64) max_flow_queued_bytes: (reg.route_channel_capacity() as u64)
.saturating_mul(2 * 1024) .saturating_mul(2 * 1024)
.clamp(64 * 1024, 2 * 1024 * 1024), .clamp(64 * 1024, 2 * 1024 * 1024),
backpressure_enabled: route_backpressure_enabled.load(Ordering::Relaxed),
..WorkerFairnessConfig::default() ..WorkerFairnessConfig::default()
}, },
Instant::now(), Instant::now(),
); );
let mut fairness_snapshot = fairness.snapshot(); let mut fairness_snapshot = fairness.snapshot();
loop { loop {
let backpressure_enabled = route_backpressure_enabled.load(Ordering::Relaxed);
let fairshare_enabled = route_fairshare_enabled.load(Ordering::Relaxed);
fairness.set_backpressure_enabled(backpressure_enabled);
let fairness_has_backlog = should_schedule_fairness_retry(&fairness_snapshot);
let fairshare_active = fairshare_enabled || fairness_has_backlog;
let mut tmp = [0u8; 65_536]; let mut tmp = [0u8; 65_536];
let backlog_retry_enabled = should_schedule_fairness_retry(&fairness_snapshot); let backlog_retry_enabled = fairshare_active && fairness_has_backlog;
let backlog_retry_delay = let backlog_retry_delay =
fairness_retry_delay(reader_route_data_wait_ms.load(Ordering::Relaxed)); fairness_retry_delay(reader_route_data_wait_ms.load(Ordering::Relaxed));
let mut retry_only = false; let mut retry_only = false;
@@ -262,6 +283,7 @@ pub(crate) async fn reader_loop(
reg.as_ref(), reg.as_ref(),
&tx, &tx,
&mut data_route_queue_full_streak, &mut data_route_queue_full_streak,
backpressure_enabled,
route_wait_ms, route_wait_ms,
stats.as_ref(), stats.as_ref(),
) )
@@ -346,6 +368,7 @@ pub(crate) async fn reader_loop(
let data = body.slice(12..); let data = body.slice(12..);
trace!(cid, flags, len = data.len(), "RPC_PROXY_ANS"); trace!(cid, flags, len = data.len(), "RPC_PROXY_ANS");
if fairshare_active {
let admission = fairness.enqueue_data(cid, flags, data, Instant::now()); let admission = fairness.enqueue_data(cid, flags, data, Instant::now());
if !matches!(admission, AdmissionDecision::Admit) { if !matches!(admission, AdmissionDecision::Admit) {
stats.increment_me_route_drop_queue_full(); stats.increment_me_route_drop_queue_full();
@@ -353,8 +376,12 @@ pub(crate) async fn reader_loop(
let streak = data_route_queue_full_streak.entry(cid).or_insert(0); let streak = data_route_queue_full_streak.entry(cid).or_insert(0);
*streak = streak.saturating_add(1); *streak = streak.saturating_add(1);
let pressure_state = fairness.pressure_state(); let pressure_state = fairness.pressure_state();
if should_close_on_queue_full_streak(*streak, pressure_state) if should_close_on_queue_full_streak_with_policy(
|| matches!(admission, AdmissionDecision::RejectSaturated) *streak,
pressure_state,
backpressure_enabled,
) || (backpressure_enabled
&& matches!(admission, AdmissionDecision::RejectSaturated))
{ {
fairness.remove_flow(cid); fairness.remove_flow(cid);
data_route_queue_full_streak.remove(&cid); data_route_queue_full_streak.remove(&cid);
@@ -362,6 +389,37 @@ pub(crate) async fn reader_loop(
send_close_conn(&tx, cid).await; send_close_conn(&tx, cid).await;
} }
} }
} else {
let route_wait_ms = reader_route_data_wait_ms.load(Ordering::Relaxed);
let routed =
route_data_with_retry(reg.as_ref(), cid, flags, data, route_wait_ms).await;
if matches!(routed, RouteResult::Routed) {
data_route_queue_full_streak.remove(&cid);
continue;
}
report_route_drop(routed, stats.as_ref());
if should_close_on_route_result_for_data(routed) {
fairness.remove_flow(cid);
data_route_queue_full_streak.remove(&cid);
reg.unregister(cid).await;
send_close_conn(&tx, cid).await;
continue;
}
if is_data_route_queue_full(routed) {
let streak = data_route_queue_full_streak.entry(cid).or_insert(0);
*streak = streak.saturating_add(1);
if should_close_on_queue_full_streak_with_policy(
*streak,
PressureState::Shedding,
backpressure_enabled,
) {
fairness.remove_flow(cid);
data_route_queue_full_streak.remove(&cid);
reg.unregister(cid).await;
send_close_conn(&tx, cid).await;
}
}
}
} else if pt == RPC_SIMPLE_ACK_U32 && body.len() >= 12 { } else if pt == RPC_SIMPLE_ACK_U32 && body.len() >= 12 {
let cid = u64::from_le_bytes(body[0..8].try_into().unwrap()); let cid = u64::from_le_bytes(body[0..8].try_into().unwrap());
let cfm = u32::from_le_bytes(body[8..12].try_into().unwrap()); let cfm = u32::from_le_bytes(body[8..12].try_into().unwrap());
@@ -465,6 +523,7 @@ pub(crate) async fn reader_loop(
reg.as_ref(), reg.as_ref(),
&tx, &tx,
&mut data_route_queue_full_streak, &mut data_route_queue_full_streak,
backpressure_enabled,
route_wait_ms, route_wait_ms,
stats.as_ref(), stats.as_ref(),
) )
@@ -486,9 +545,8 @@ mod tests {
use super::{ use super::{
MeResponse, RouteResult, WorkerFairnessSnapshot, fairness_retry_delay, MeResponse, RouteResult, WorkerFairnessSnapshot, fairness_retry_delay,
is_data_route_queue_full, route_data_with_retry, should_close_on_queue_full_streak, is_data_route_queue_full, route_data_with_retry, should_close_on_queue_full_streak_with_policy,
should_close_on_route_result_for_ack, should_close_on_route_result_for_data, should_close_on_route_result_for_ack, should_close_on_route_result_for_data, should_schedule_fairness_retry,
should_schedule_fairness_retry,
}; };
#[test] #[test]
@@ -511,22 +569,35 @@ mod tests {
assert!(is_data_route_queue_full(RouteResult::QueueFullBase)); assert!(is_data_route_queue_full(RouteResult::QueueFullBase));
assert!(is_data_route_queue_full(RouteResult::QueueFullHigh)); assert!(is_data_route_queue_full(RouteResult::QueueFullHigh));
assert!(!is_data_route_queue_full(RouteResult::NoConn)); assert!(!is_data_route_queue_full(RouteResult::NoConn));
assert!(!should_close_on_queue_full_streak(1, PressureState::Normal)); assert!(!should_close_on_queue_full_streak_with_policy(
assert!(!should_close_on_queue_full_streak( 1,
PressureState::Normal,
true
));
assert!(!should_close_on_queue_full_streak_with_policy(
2, 2,
PressureState::Pressured PressureState::Pressured,
true
)); ));
assert!(!should_close_on_queue_full_streak( assert!(!should_close_on_queue_full_streak_with_policy(
3, 3,
PressureState::Pressured PressureState::Pressured,
true
)); ));
assert!(should_close_on_queue_full_streak( assert!(should_close_on_queue_full_streak_with_policy(
3, 3,
PressureState::Shedding PressureState::Shedding,
true
)); ));
assert!(should_close_on_queue_full_streak( assert!(should_close_on_queue_full_streak_with_policy(
u8::MAX, u8::MAX,
PressureState::Saturated PressureState::Saturated,
true
));
assert!(!should_close_on_queue_full_streak_with_policy(
u8::MAX,
PressureState::Saturated,
false
)); ));
} }

View File

@@ -104,6 +104,8 @@ async fn make_pool(
MeSocksKdfPolicy::default(), MeSocksKdfPolicy::default(),
general.me_writer_cmd_channel_capacity, general.me_writer_cmd_channel_capacity,
general.me_route_channel_capacity, general.me_route_channel_capacity,
general.me_route_backpressure_enabled,
general.me_route_fairshare_enabled,
general.me_route_backpressure_base_timeout_ms, general.me_route_backpressure_base_timeout_ms,
general.me_route_backpressure_high_timeout_ms, general.me_route_backpressure_high_timeout_ms,
general.me_route_backpressure_high_watermark_pct, general.me_route_backpressure_high_watermark_pct,

View File

@@ -102,6 +102,8 @@ async fn make_pool(
MeSocksKdfPolicy::default(), MeSocksKdfPolicy::default(),
general.me_writer_cmd_channel_capacity, general.me_writer_cmd_channel_capacity,
general.me_route_channel_capacity, general.me_route_channel_capacity,
general.me_route_backpressure_enabled,
general.me_route_fairshare_enabled,
general.me_route_backpressure_base_timeout_ms, general.me_route_backpressure_base_timeout_ms,
general.me_route_backpressure_high_timeout_ms, general.me_route_backpressure_high_timeout_ms,
general.me_route_backpressure_high_watermark_pct, general.me_route_backpressure_high_watermark_pct,

View File

@@ -97,6 +97,8 @@ async fn make_pool(me_pool_drain_threshold: u64) -> Arc<MePool> {
MeSocksKdfPolicy::default(), MeSocksKdfPolicy::default(),
general.me_writer_cmd_channel_capacity, general.me_writer_cmd_channel_capacity,
general.me_route_channel_capacity, general.me_route_channel_capacity,
general.me_route_backpressure_enabled,
general.me_route_fairshare_enabled,
general.me_route_backpressure_base_timeout_ms, general.me_route_backpressure_base_timeout_ms,
general.me_route_backpressure_high_timeout_ms, general.me_route_backpressure_high_timeout_ms,
general.me_route_backpressure_high_watermark_pct, general.me_route_backpressure_high_watermark_pct,

View File

@@ -86,6 +86,8 @@ async fn make_pool() -> Arc<MePool> {
MeSocksKdfPolicy::default(), MeSocksKdfPolicy::default(),
general.me_writer_cmd_channel_capacity, general.me_writer_cmd_channel_capacity,
general.me_route_channel_capacity, general.me_route_channel_capacity,
general.me_route_backpressure_enabled,
general.me_route_fairshare_enabled,
general.me_route_backpressure_base_timeout_ms, general.me_route_backpressure_base_timeout_ms,
general.me_route_backpressure_high_timeout_ms, general.me_route_backpressure_high_timeout_ms,
general.me_route_backpressure_high_watermark_pct, general.me_route_backpressure_high_watermark_pct,

View File

@@ -91,6 +91,8 @@ async fn make_pool() -> Arc<MePool> {
MeSocksKdfPolicy::default(), MeSocksKdfPolicy::default(),
general.me_writer_cmd_channel_capacity, general.me_writer_cmd_channel_capacity,
general.me_route_channel_capacity, general.me_route_channel_capacity,
general.me_route_backpressure_enabled,
general.me_route_fairshare_enabled,
general.me_route_backpressure_base_timeout_ms, general.me_route_backpressure_base_timeout_ms,
general.me_route_backpressure_high_timeout_ms, general.me_route_backpressure_high_timeout_ms,
general.me_route_backpressure_high_watermark_pct, general.me_route_backpressure_high_watermark_pct,

View File

@@ -97,6 +97,8 @@ async fn make_pool() -> (Arc<MePool>, Arc<SecureRandom>) {
MeSocksKdfPolicy::default(), MeSocksKdfPolicy::default(),
general.me_writer_cmd_channel_capacity, general.me_writer_cmd_channel_capacity,
general.me_route_channel_capacity, general.me_route_channel_capacity,
general.me_route_backpressure_enabled,
general.me_route_fairshare_enabled,
general.me_route_backpressure_base_timeout_ms, general.me_route_backpressure_base_timeout_ms,
general.me_route_backpressure_high_timeout_ms, general.me_route_backpressure_high_timeout_ms,
general.me_route_backpressure_high_watermark_pct, general.me_route_backpressure_high_watermark_pct,