From 9401c46727296d73fefdddf54243511ef33eba57 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Sun, 8 Mar 2026 03:05:47 +0300 Subject: [PATCH] ME Writer Pick --- src/config/defaults.rs | 9 +- src/config/hot_reload.rs | 19 +- src/config/load.rs | 6 + src/config/types.rs | 36 +++ src/main.rs | 2 + src/transport/middle_proxy/config_updater.rs | 4 + src/transport/middle_proxy/pool.rs | 31 ++- src/transport/middle_proxy/pool_status.rs | 28 +++ src/transport/middle_proxy/send.rs | 226 ++++++++++++++----- 9 files changed, 300 insertions(+), 61 deletions(-) diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 798d881..68dd71e 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -21,9 +21,10 @@ const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_ACTIVE_WRITERS_PER_CORE: u16 = 64; const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_WARM_WRITERS_PER_CORE: u16 = 64; const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_ACTIVE_WRITERS_GLOBAL: u32 = 256; const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_WARM_WRITERS_GLOBAL: u32 = 256; -const DEFAULT_ME_WRITER_CMD_CHANNEL_CAPACITY: usize = 512; +const DEFAULT_ME_WRITER_CMD_CHANNEL_CAPACITY: usize = 1024; const DEFAULT_ME_ROUTE_CHANNEL_CAPACITY: usize = 512; -const DEFAULT_ME_C2ME_CHANNEL_CAPACITY: usize = 128; +const DEFAULT_ME_C2ME_CHANNEL_CAPACITY: usize = 256; +const DEFAULT_ME_WRITER_PICK_SAMPLE_SIZE: u8 = 3; const DEFAULT_ME_HEALTH_INTERVAL_MS_UNHEALTHY: u64 = 1000; const DEFAULT_ME_HEALTH_INTERVAL_MS_HEALTHY: u64 = 3000; const DEFAULT_ME_ADMISSION_POLL_MS: u64 = 1000; @@ -315,6 +316,10 @@ pub(crate) fn default_me_c2me_channel_capacity() -> usize { DEFAULT_ME_C2ME_CHANNEL_CAPACITY } +pub(crate) fn default_me_writer_pick_sample_size() -> u8 { + DEFAULT_ME_WRITER_PICK_SAMPLE_SIZE +} + pub(crate) fn default_me_health_interval_ms_unhealthy() -> u64 { DEFAULT_ME_HEALTH_INTERVAL_MS_UNHEALTHY } diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index e7029a3..34b2d76 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -29,7 +29,10 @@ use notify::{EventKind, RecursiveMode, Watcher, recommended_watcher}; use tokio::sync::{mpsc, watch}; use tracing::{error, info, warn}; -use crate::config::{LogLevel, MeBindStaleMode, MeFloorMode, MeSocksKdfPolicy, MeTelemetryLevel}; +use crate::config::{ + LogLevel, MeBindStaleMode, MeFloorMode, MeSocksKdfPolicy, MeTelemetryLevel, + MeWriterPickMode, +}; use super::load::ProxyConfig; // ── Hot fields ──────────────────────────────────────────────────────────────── @@ -57,6 +60,8 @@ pub struct HotFields { pub me_bind_stale_ttl_secs: u64, pub me_secret_atomic_snapshot: bool, pub me_deterministic_writer_sort: bool, + pub me_writer_pick_mode: MeWriterPickMode, + pub me_writer_pick_sample_size: u8, pub me_single_endpoint_shadow_writers: u8, pub me_single_endpoint_outage_mode_enabled: bool, pub me_single_endpoint_outage_disable_quarantine: bool, @@ -130,6 +135,8 @@ impl HotFields { me_bind_stale_ttl_secs: cfg.general.me_bind_stale_ttl_secs, me_secret_atomic_snapshot: cfg.general.me_secret_atomic_snapshot, me_deterministic_writer_sort: cfg.general.me_deterministic_writer_sort, + me_writer_pick_mode: cfg.general.me_writer_pick_mode, + me_writer_pick_sample_size: cfg.general.me_writer_pick_sample_size, me_single_endpoint_shadow_writers: cfg.general.me_single_endpoint_shadow_writers, me_single_endpoint_outage_mode_enabled: cfg .general @@ -292,6 +299,8 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig { cfg.general.me_bind_stale_ttl_secs = new.general.me_bind_stale_ttl_secs; cfg.general.me_secret_atomic_snapshot = new.general.me_secret_atomic_snapshot; cfg.general.me_deterministic_writer_sort = new.general.me_deterministic_writer_sort; + cfg.general.me_writer_pick_mode = new.general.me_writer_pick_mode; + cfg.general.me_writer_pick_sample_size = new.general.me_writer_pick_sample_size; cfg.general.me_single_endpoint_shadow_writers = new.general.me_single_endpoint_shadow_writers; cfg.general.me_single_endpoint_outage_mode_enabled = new.general.me_single_endpoint_outage_mode_enabled; @@ -683,11 +692,15 @@ fn log_changes( } if old_hot.me_secret_atomic_snapshot != new_hot.me_secret_atomic_snapshot || old_hot.me_deterministic_writer_sort != new_hot.me_deterministic_writer_sort + || old_hot.me_writer_pick_mode != new_hot.me_writer_pick_mode + || old_hot.me_writer_pick_sample_size != new_hot.me_writer_pick_sample_size { info!( - "config reload: me_runtime_flags: secret_atomic_snapshot={} deterministic_sort={}", + "config reload: me_runtime_flags: secret_atomic_snapshot={} deterministic_sort={} writer_pick_mode={:?} writer_pick_sample_size={}", new_hot.me_secret_atomic_snapshot, - new_hot.me_deterministic_writer_sort + new_hot.me_deterministic_writer_sort, + new_hot.me_writer_pick_mode, + new_hot.me_writer_pick_sample_size, ); } if old_hot.me_single_endpoint_shadow_writers != new_hot.me_single_endpoint_shadow_writers diff --git a/src/config/load.rs b/src/config/load.rs index c013b1a..623ec4d 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -519,6 +519,12 @@ impl ProxyConfig { )); } + if !(2..=4).contains(&config.general.me_writer_pick_sample_size) { + return Err(ProxyError::Config( + "general.me_writer_pick_sample_size must be within [2, 4]".to_string(), + )); + } + if config.general.me_route_inline_recovery_attempts == 0 { return Err(ProxyError::Config( "general.me_route_inline_recovery_attempts must be > 0".to_string(), diff --git a/src/config/types.rs b/src/config/types.rs index b2be9cf..588c82f 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -212,6 +212,32 @@ impl MeRouteNoWriterMode { } } +/// Middle-End writer selection mode for new client bindings. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] +#[serde(rename_all = "snake_case")] +pub enum MeWriterPickMode { + SortedRr, + #[default] + P2c, +} + +impl MeWriterPickMode { + pub fn as_u8(self) -> u8 { + match self { + MeWriterPickMode::SortedRr => 0, + MeWriterPickMode::P2c => 1, + } + } + + pub fn from_u8(raw: u8) -> Self { + match raw { + 0 => MeWriterPickMode::SortedRr, + 1 => MeWriterPickMode::P2c, + _ => MeWriterPickMode::P2c, + } + } +} + /// Per-user unique source IP limit mode. #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] #[serde(rename_all = "snake_case")] @@ -782,6 +808,14 @@ pub struct GeneralConfig { #[serde(default = "default_me_deterministic_writer_sort")] pub me_deterministic_writer_sort: bool, + /// Writer selection mode for ME route bind path. + #[serde(default)] + pub me_writer_pick_mode: MeWriterPickMode, + + /// Number of candidates sampled by writer picker in `p2c` mode. + #[serde(default = "default_me_writer_pick_sample_size")] + pub me_writer_pick_sample_size: u8, + /// Enable NTP drift check at startup. #[serde(default = "default_ntp_check")] pub ntp_check: bool, @@ -912,6 +946,8 @@ impl Default for GeneralConfig { me_reinit_trigger_channel: default_me_reinit_trigger_channel(), me_reinit_coalesce_window_ms: default_me_reinit_coalesce_window_ms(), me_deterministic_writer_sort: default_me_deterministic_writer_sort(), + me_writer_pick_mode: MeWriterPickMode::default(), + me_writer_pick_sample_size: default_me_writer_pick_sample_size(), ntp_check: default_ntp_check(), ntp_servers: default_ntp_servers(), auto_degradation_enabled: default_true(), diff --git a/src/main.rs b/src/main.rs index c1059c3..d28fabe 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1047,6 +1047,8 @@ async fn main() -> std::result::Result<(), Box> { config.general.me_bind_stale_ttl_secs, config.general.me_secret_atomic_snapshot, config.general.me_deterministic_writer_sort, + config.general.me_writer_pick_mode, + config.general.me_writer_pick_sample_size, config.general.me_socks_kdf_policy, config.general.me_writer_cmd_channel_capacity, config.general.me_route_channel_capacity, diff --git a/src/transport/middle_proxy/config_updater.rs b/src/transport/middle_proxy/config_updater.rs index 1bcda14..2c6a07a 100644 --- a/src/transport/middle_proxy/config_updater.rs +++ b/src/transport/middle_proxy/config_updater.rs @@ -306,6 +306,8 @@ async fn run_update_cycle( cfg.general.me_bind_stale_ttl_secs, cfg.general.me_secret_atomic_snapshot, cfg.general.me_deterministic_writer_sort, + cfg.general.me_writer_pick_mode, + cfg.general.me_writer_pick_sample_size, cfg.general.me_single_endpoint_shadow_writers, cfg.general.me_single_endpoint_outage_mode_enabled, cfg.general.me_single_endpoint_outage_disable_quarantine, @@ -530,6 +532,8 @@ pub async fn me_config_updater( cfg.general.me_bind_stale_ttl_secs, cfg.general.me_secret_atomic_snapshot, cfg.general.me_deterministic_writer_sort, + cfg.general.me_writer_pick_mode, + cfg.general.me_writer_pick_sample_size, cfg.general.me_single_endpoint_shadow_writers, cfg.general.me_single_endpoint_outage_mode_enabled, cfg.general.me_single_endpoint_outage_disable_quarantine, diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 0c9c30c..07ad67b 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -7,7 +7,9 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use tokio::sync::{Mutex, Notify, RwLock, mpsc}; use tokio_util::sync::CancellationToken; -use crate::config::{MeBindStaleMode, MeFloorMode, MeRouteNoWriterMode, MeSocksKdfPolicy}; +use crate::config::{ + MeBindStaleMode, MeFloorMode, MeRouteNoWriterMode, MeSocksKdfPolicy, MeWriterPickMode, +}; use crate::crypto::SecureRandom; use crate::network::IpFamily; use crate::network::probe::NetworkDecision; @@ -39,6 +41,7 @@ pub struct MeWriter { pub tx: mpsc::Sender, pub cancel: CancellationToken, pub degraded: Arc, + pub rtt_ema_ms_x10: Arc, pub draining: Arc, pub draining_started_at_epoch_secs: Arc, pub drain_deadline_epoch_secs: Arc, @@ -177,6 +180,8 @@ pub struct MePool { pub(super) me_bind_stale_ttl_secs: AtomicU64, pub(super) secret_atomic_snapshot: AtomicBool, pub(super) me_deterministic_writer_sort: AtomicBool, + pub(super) me_writer_pick_mode: AtomicU8, + pub(super) me_writer_pick_sample_size: AtomicU8, pub(super) me_socks_kdf_policy: AtomicU8, pub(super) me_route_no_writer_mode: AtomicU8, pub(super) me_route_no_writer_wait: Duration, @@ -274,6 +279,8 @@ impl MePool { me_bind_stale_ttl_secs: u64, me_secret_atomic_snapshot: bool, me_deterministic_writer_sort: bool, + me_writer_pick_mode: MeWriterPickMode, + me_writer_pick_sample_size: u8, me_socks_kdf_policy: MeSocksKdfPolicy, me_writer_cmd_channel_capacity: usize, me_route_channel_capacity: usize, @@ -450,6 +457,8 @@ impl MePool { me_bind_stale_ttl_secs: AtomicU64::new(me_bind_stale_ttl_secs), secret_atomic_snapshot: AtomicBool::new(me_secret_atomic_snapshot), me_deterministic_writer_sort: AtomicBool::new(me_deterministic_writer_sort), + me_writer_pick_mode: AtomicU8::new(me_writer_pick_mode.as_u8()), + me_writer_pick_sample_size: AtomicU8::new(me_writer_pick_sample_size.clamp(2, 4)), me_socks_kdf_policy: AtomicU8::new(me_socks_kdf_policy.as_u8()), me_route_no_writer_mode: AtomicU8::new(me_route_no_writer_mode.as_u8()), me_route_no_writer_wait: Duration::from_millis(me_route_no_writer_wait_ms), @@ -489,6 +498,8 @@ impl MePool { bind_stale_ttl_secs: u64, secret_atomic_snapshot: bool, deterministic_writer_sort: bool, + writer_pick_mode: MeWriterPickMode, + writer_pick_sample_size: u8, single_endpoint_shadow_writers: u8, single_endpoint_outage_mode_enabled: bool, single_endpoint_outage_disable_quarantine: bool, @@ -535,6 +546,14 @@ impl MePool { .store(secret_atomic_snapshot, Ordering::Relaxed); self.me_deterministic_writer_sort .store(deterministic_writer_sort, Ordering::Relaxed); + let previous_writer_pick_mode = self.writer_pick_mode(); + self.me_writer_pick_mode + .store(writer_pick_mode.as_u8(), Ordering::Relaxed); + self.me_writer_pick_sample_size + .store(writer_pick_sample_size.clamp(2, 4), Ordering::Relaxed); + if previous_writer_pick_mode != writer_pick_mode { + self.stats.increment_me_writer_pick_mode_switch_total(); + } self.me_single_endpoint_shadow_writers .store(single_endpoint_shadow_writers, Ordering::Relaxed); self.me_single_endpoint_outage_mode_enabled @@ -692,6 +711,16 @@ impl MePool { MeBindStaleMode::from_u8(self.me_bind_stale_mode.load(Ordering::Relaxed)) } + pub(super) fn writer_pick_mode(&self) -> MeWriterPickMode { + MeWriterPickMode::from_u8(self.me_writer_pick_mode.load(Ordering::Relaxed)) + } + + pub(super) fn writer_pick_sample_size(&self) -> usize { + self.me_writer_pick_sample_size + .load(Ordering::Relaxed) + .clamp(2, 4) as usize + } + pub(super) fn required_writers_for_dc(&self, endpoint_count: usize) -> usize { if endpoint_count == 0 { return 0; diff --git a/src/transport/middle_proxy/pool_status.rs b/src/transport/middle_proxy/pool_status.rs index cc1be5b..6673cf2 100644 --- a/src/transport/middle_proxy/pool_status.rs +++ b/src/transport/middle_proxy/pool_status.rs @@ -25,6 +25,7 @@ pub(crate) struct MeApiWriterStatusSnapshot { pub(crate) struct MeApiDcStatusSnapshot { pub dc: i16, pub endpoints: Vec, + pub endpoint_writers: Vec, pub available_endpoints: usize, pub available_pct: f64, pub required_writers: usize, @@ -38,6 +39,12 @@ pub(crate) struct MeApiDcStatusSnapshot { pub load: usize, } +#[derive(Clone, Debug)] +pub(crate) struct MeApiDcEndpointWriterSnapshot { + pub endpoint: SocketAddr, + pub active_writers: usize, +} + #[derive(Clone, Debug)] pub(crate) struct MeApiStatusSnapshot { pub generated_at_epoch_secs: u64, @@ -118,6 +125,8 @@ pub(crate) struct MeApiRuntimeSnapshot { pub me_single_endpoint_outage_backoff_max_ms: u64, pub me_single_endpoint_shadow_rotate_every_secs: u64, pub me_deterministic_writer_sort: bool, + pub me_writer_pick_mode: &'static str, + pub me_writer_pick_sample_size: u8, pub me_socks_kdf_policy: &'static str, pub quarantined_endpoints: Vec, pub network_path: Vec, @@ -338,6 +347,16 @@ impl MePool { dcs.push(MeApiDcStatusSnapshot { dc, + endpoint_writers: endpoints + .iter() + .map(|endpoint| MeApiDcEndpointWriterSnapshot { + endpoint: *endpoint, + active_writers: live_writers_by_dc_endpoint + .get(&(dc, *endpoint)) + .copied() + .unwrap_or(0), + }) + .collect(), endpoints: endpoints.into_iter().collect(), available_endpoints: dc_available_endpoints, available_pct: ratio_pct(dc_available_endpoints, endpoint_count), @@ -522,6 +541,8 @@ impl MePool { me_deterministic_writer_sort: self .me_deterministic_writer_sort .load(Ordering::Relaxed), + me_writer_pick_mode: writer_pick_mode_label(self.writer_pick_mode()), + me_writer_pick_sample_size: self.writer_pick_sample_size() as u8, me_socks_kdf_policy: socks_kdf_policy_label(self.socks_kdf_policy()), quarantined_endpoints, network_path, @@ -570,6 +591,13 @@ fn bind_stale_mode_label(mode: MeBindStaleMode) -> &'static str { } } +fn writer_pick_mode_label(mode: crate::config::MeWriterPickMode) -> &'static str { + match mode { + crate::config::MeWriterPickMode::SortedRr => "sorted_rr", + crate::config::MeWriterPickMode::P2c => "p2c", + } +} + fn socks_kdf_policy_label(policy: MeSocksKdfPolicy) -> &'static str { match policy { MeSocksKdfPolicy::Strict => "strict", diff --git a/src/transport/middle_proxy/send.rs b/src/transport/middle_proxy/send.rs index ec199fd..79cfa54 100644 --- a/src/transport/middle_proxy/send.rs +++ b/src/transport/middle_proxy/send.rs @@ -9,7 +9,7 @@ use bytes::Bytes; use tokio::sync::mpsc::error::TrySendError; use tracing::{debug, warn}; -use crate::config::MeRouteNoWriterMode; +use crate::config::{MeRouteNoWriterMode, MeWriterPickMode}; use crate::error::{ProxyError, Result}; use crate::network::IpFamily; use crate::protocol::constants::{RPC_CLOSE_CONN_U32, RPC_CLOSE_EXT_U32}; @@ -24,6 +24,10 @@ use super::registry::ConnMeta; const IDLE_WRITER_PENALTY_MID_SECS: u64 = 45; const IDLE_WRITER_PENALTY_HIGH_SECS: u64 = 55; const HYBRID_GLOBAL_BURST_PERIOD_ROUNDS: u32 = 4; +const PICK_PENALTY_WARM: u64 = 200; +const PICK_PENALTY_DRAINING: u64 = 600; +const PICK_PENALTY_STALE: u64 = 300; +const PICK_PENALTY_DEGRADED: u64 = 250; impl MePool { /// Send RPC_PROXY_REQ. `tag_override`: per-user ad_tag (from access.user_ad_tags); if None, uses pool default. @@ -181,6 +185,7 @@ impl MePool { .await; } if candidate_indices.is_empty() { + let pick_mode = self.writer_pick_mode(); match no_writer_mode { MeRouteNoWriterMode::AsyncRecoveryFailfast => { let deadline = *no_writer_deadline.get_or_insert_with(|| { @@ -196,6 +201,7 @@ impl MePool { if self.wait_for_candidate_until(routed_dc, deadline).await { continue; } + self.stats.increment_me_writer_pick_no_candidate_total(pick_mode); self.stats.increment_me_no_writer_failfast_total(); return Err(ProxyError::Proxy( "No ME writers available for target DC in failfast window".into(), @@ -209,10 +215,12 @@ impl MePool { if self.wait_for_candidate_until(routed_dc, deadline).await { continue; } + self.stats.increment_me_writer_pick_no_candidate_total(pick_mode); self.stats.increment_me_no_writer_failfast_total(); return Err(ProxyError::Proxy("No ME writers available for target DC".into())); } if emergency_attempts >= self.me_route_inline_recovery_attempts.max(1) { + self.stats.increment_me_writer_pick_no_candidate_total(pick_mode); self.stats.increment_me_no_writer_failfast_total(); return Err(ProxyError::Proxy("No ME writers available for target DC".into())); } @@ -237,6 +245,7 @@ impl MePool { .await; } if candidate_indices.is_empty() { + self.stats.increment_me_writer_pick_no_candidate_total(pick_mode); return Err(ProxyError::Proxy("No ME writers available for target DC".into())); } } @@ -259,6 +268,8 @@ impl MePool { } } hybrid_wait_current = hybrid_wait_step; + let pick_mode = self.writer_pick_mode(); + let pick_sample_size = self.writer_pick_sample_size(); let writer_ids: Vec = candidate_indices .iter() .map(|idx| writers_snapshot[*idx].id) @@ -268,69 +279,84 @@ impl MePool { .writer_idle_since_for_writer_ids(&writer_ids) .await; let now_epoch_secs = Self::now_epoch_secs(); - - if self.me_deterministic_writer_sort.load(Ordering::Relaxed) { - candidate_indices.sort_by(|lhs, rhs| { - let left = &writers_snapshot[*lhs]; - let right = &writers_snapshot[*rhs]; - let left_key = ( - self.writer_contour_rank_for_selection(left), - (left.generation < self.current_generation()) as usize, - left.degraded.load(Ordering::Relaxed) as usize, - self.writer_idle_rank_for_selection( - left, - &writer_idle_since, - now_epoch_secs, - ), - Reverse(left.tx.capacity()), - left.addr, - left.id, - ); - let right_key = ( - self.writer_contour_rank_for_selection(right), - (right.generation < self.current_generation()) as usize, - right.degraded.load(Ordering::Relaxed) as usize, - self.writer_idle_rank_for_selection( - right, - &writer_idle_since, - now_epoch_secs, - ), - Reverse(right.tx.capacity()), - right.addr, - right.id, - ); - left_key.cmp(&right_key) - }); - } else { - candidate_indices.sort_by_key(|idx| { - let w = &writers_snapshot[*idx]; - let degraded = w.degraded.load(Ordering::Relaxed); - let stale = (w.generation < self.current_generation()) as usize; - ( - self.writer_contour_rank_for_selection(w), - stale, - degraded as usize, - self.writer_idle_rank_for_selection( - w, - &writer_idle_since, - now_epoch_secs, - ), - Reverse(w.tx.capacity()), - ) - }); - } - let start = self.rr.fetch_add(1, Ordering::Relaxed) as usize % candidate_indices.len(); + let ordered_candidate_indices = if pick_mode == MeWriterPickMode::P2c { + self.p2c_ordered_candidate_indices( + &candidate_indices, + &writers_snapshot, + &writer_idle_since, + now_epoch_secs, + start, + pick_sample_size, + ) + } else { + if self.me_deterministic_writer_sort.load(Ordering::Relaxed) { + candidate_indices.sort_by(|lhs, rhs| { + let left = &writers_snapshot[*lhs]; + let right = &writers_snapshot[*rhs]; + let left_key = ( + self.writer_contour_rank_for_selection(left), + (left.generation < self.current_generation()) as usize, + left.degraded.load(Ordering::Relaxed) as usize, + self.writer_idle_rank_for_selection( + left, + &writer_idle_since, + now_epoch_secs, + ), + Reverse(left.tx.capacity()), + left.addr, + left.id, + ); + let right_key = ( + self.writer_contour_rank_for_selection(right), + (right.generation < self.current_generation()) as usize, + right.degraded.load(Ordering::Relaxed) as usize, + self.writer_idle_rank_for_selection( + right, + &writer_idle_since, + now_epoch_secs, + ), + Reverse(right.tx.capacity()), + right.addr, + right.id, + ); + left_key.cmp(&right_key) + }); + } else { + candidate_indices.sort_by_key(|idx| { + let w = &writers_snapshot[*idx]; + let degraded = w.degraded.load(Ordering::Relaxed); + let stale = (w.generation < self.current_generation()) as usize; + ( + self.writer_contour_rank_for_selection(w), + stale, + degraded as usize, + self.writer_idle_rank_for_selection( + w, + &writer_idle_since, + now_epoch_secs, + ), + Reverse(w.tx.capacity()), + ) + }); + } + + let mut ordered = Vec::::with_capacity(candidate_indices.len()); + for offset in 0..candidate_indices.len() { + ordered.push(candidate_indices[(start + offset) % candidate_indices.len()]); + } + ordered + }; let mut fallback_blocking_idx: Option = None; - for offset in 0..candidate_indices.len() { - let idx = candidate_indices[(start + offset) % candidate_indices.len()]; + for idx in ordered_candidate_indices { let w = &writers_snapshot[idx]; if !self.writer_accepts_new_binding(w) { continue; } match w.tx.try_send(WriterCommand::Data(payload.clone())) { Ok(()) => { + self.stats.increment_me_writer_pick_success_try_total(pick_mode); self.registry .bind_writer(conn_id, w.id, w.tx.clone(), meta.clone()) .await; @@ -352,6 +378,7 @@ impl MePool { } } Err(TrySendError::Closed(_)) => { + self.stats.increment_me_writer_pick_closed_total(pick_mode); warn!(writer_id = w.id, "ME writer channel closed"); self.remove_writer_and_close_clients(w.id).await; continue; @@ -360,15 +387,20 @@ impl MePool { } let Some(blocking_idx) = fallback_blocking_idx else { + self.stats.increment_me_writer_pick_full_total(pick_mode); continue; }; let w = writers_snapshot[blocking_idx].clone(); if !self.writer_accepts_new_binding(&w) { + self.stats.increment_me_writer_pick_full_total(pick_mode); continue; } + self.stats.increment_me_writer_pick_blocking_fallback_total(); match w.tx.send(WriterCommand::Data(payload.clone())).await { Ok(()) => { + self.stats + .increment_me_writer_pick_success_fallback_total(pick_mode); self.registry .bind_writer(conn_id, w.id, w.tx.clone(), meta.clone()) .await; @@ -378,6 +410,7 @@ impl MePool { return Ok(()); } Err(_) => { + self.stats.increment_me_writer_pick_closed_total(pick_mode); warn!(writer_id = w.id, "ME writer channel closed (blocking)"); self.remove_writer_and_close_clients(w.id).await; } @@ -626,4 +659,87 @@ impl MePool { 0 } } + + fn writer_pick_score( + &self, + writer: &super::pool::MeWriter, + idle_since_by_writer: &HashMap, + now_epoch_secs: u64, + ) -> u64 { + let contour_penalty = match WriterContour::from_u8(writer.contour.load(Ordering::Relaxed)) { + WriterContour::Active => 0, + WriterContour::Warm => PICK_PENALTY_WARM, + WriterContour::Draining => PICK_PENALTY_DRAINING, + }; + let stale_penalty = if writer.generation < self.current_generation() { + PICK_PENALTY_STALE + } else { + 0 + }; + let degraded_penalty = if writer.degraded.load(Ordering::Relaxed) { + PICK_PENALTY_DEGRADED + } else { + 0 + }; + let idle_penalty = + (self.writer_idle_rank_for_selection(writer, idle_since_by_writer, now_epoch_secs) as u64) + * 100; + let queue_cap = self.writer_cmd_channel_capacity.max(1) as u64; + let queue_remaining = writer.tx.capacity() as u64; + let queue_used = queue_cap.saturating_sub(queue_remaining.min(queue_cap)); + let queue_util_pct = queue_used.saturating_mul(100) / queue_cap; + let queue_penalty = queue_util_pct.saturating_mul(4); + let rtt_penalty = ((writer.rtt_ema_ms_x10.load(Ordering::Relaxed) as u64).saturating_add(5) / 10) + .min(400); + + contour_penalty + .saturating_add(stale_penalty) + .saturating_add(degraded_penalty) + .saturating_add(idle_penalty) + .saturating_add(queue_penalty) + .saturating_add(rtt_penalty) + } + + fn p2c_ordered_candidate_indices( + &self, + candidate_indices: &[usize], + writers_snapshot: &[super::pool::MeWriter], + idle_since_by_writer: &HashMap, + now_epoch_secs: u64, + start: usize, + sample_size: usize, + ) -> Vec { + let total = candidate_indices.len(); + if total == 0 { + return Vec::new(); + } + + let mut sampled = Vec::::with_capacity(sample_size.min(total)); + let mut seen = HashSet::::with_capacity(total); + for offset in 0..sample_size.min(total) { + let idx = candidate_indices[(start + offset) % total]; + if seen.insert(idx) { + sampled.push(idx); + } + } + + sampled.sort_by_key(|idx| { + let writer = &writers_snapshot[*idx]; + ( + self.writer_pick_score(writer, idle_since_by_writer, now_epoch_secs), + writer.addr, + writer.id, + ) + }); + + let mut ordered = Vec::::with_capacity(total); + ordered.extend(sampled.iter().copied()); + for offset in 0..total { + let idx = candidate_indices[(start + offset) % total]; + if seen.insert(idx) { + ordered.push(idx); + } + } + ordered + } }