diff --git a/src/api/model.rs b/src/api/model.rs index fd678f6..0bc52de 100644 --- a/src/api/model.rs +++ b/src/api/model.rs @@ -266,6 +266,7 @@ pub(super) struct MeWritersData { pub(super) struct DcStatus { pub(super) dc: i16, pub(super) endpoints: Vec, + pub(super) endpoint_writers: Vec, pub(super) available_endpoints: usize, pub(super) available_pct: f64, pub(super) required_writers: usize, @@ -279,6 +280,12 @@ pub(super) struct DcStatus { pub(super) load: usize, } +#[derive(Serialize, Clone)] +pub(super) struct DcEndpointWriters { + pub(super) endpoint: String, + pub(super) active_writers: usize, +} + #[derive(Serialize, Clone)] pub(super) struct DcStatusData { pub(super) middle_proxy_enabled: bool, @@ -354,6 +361,8 @@ pub(super) struct MinimalMeRuntimeData { pub(super) me_single_endpoint_outage_backoff_max_ms: u64, pub(super) me_single_endpoint_shadow_rotate_every_secs: u64, pub(super) me_deterministic_writer_sort: bool, + pub(super) me_writer_pick_mode: &'static str, + pub(super) me_writer_pick_sample_size: u8, pub(super) me_socks_kdf_policy: &'static str, pub(super) quarantined_endpoints_total: usize, pub(super) quarantined_endpoints: Vec, diff --git a/src/api/runtime_stats.rs b/src/api/runtime_stats.rs index f90abe3..139a4c5 100644 --- a/src/api/runtime_stats.rs +++ b/src/api/runtime_stats.rs @@ -7,10 +7,10 @@ use crate::transport::UpstreamRouteKind; use super::ApiShared; use super::model::{ - DcStatus, DcStatusData, MeWriterStatus, MeWritersData, MeWritersSummary, MinimalAllData, - MinimalAllPayload, MinimalDcPathData, MinimalMeRuntimeData, MinimalQuarantineData, - UpstreamDcStatus, UpstreamStatus, UpstreamSummaryData, UpstreamsData, ZeroAllData, - ZeroCodeCount, ZeroCoreData, ZeroDesyncData, ZeroMiddleProxyData, ZeroPoolData, + DcEndpointWriters, DcStatus, DcStatusData, MeWriterStatus, MeWritersData, MeWritersSummary, + MinimalAllData, MinimalAllPayload, MinimalDcPathData, MinimalMeRuntimeData, + MinimalQuarantineData, UpstreamDcStatus, UpstreamStatus, UpstreamSummaryData, UpstreamsData, + ZeroAllData, ZeroCodeCount, ZeroCoreData, ZeroDesyncData, ZeroMiddleProxyData, ZeroPoolData, ZeroUpstreamData, }; @@ -346,6 +346,14 @@ async fn get_minimal_payload_cached( .into_iter() .map(|value| value.to_string()) .collect(), + endpoint_writers: entry + .endpoint_writers + .into_iter() + .map(|coverage| DcEndpointWriters { + endpoint: coverage.endpoint.to_string(), + active_writers: coverage.active_writers, + }) + .collect(), available_endpoints: entry.available_endpoints, available_pct: entry.available_pct, required_writers: entry.required_writers, @@ -422,6 +430,8 @@ async fn get_minimal_payload_cached( me_single_endpoint_shadow_rotate_every_secs: runtime .me_single_endpoint_shadow_rotate_every_secs, me_deterministic_writer_sort: runtime.me_deterministic_writer_sort, + me_writer_pick_mode: runtime.me_writer_pick_mode, + me_writer_pick_sample_size: runtime.me_writer_pick_sample_size, me_socks_kdf_policy: runtime.me_socks_kdf_policy, quarantined_endpoints_total: runtime.quarantined_endpoints.len(), quarantined_endpoints: runtime diff --git a/src/api/runtime_zero.rs b/src/api/runtime_zero.rs index 5020705..7d3d778 100644 --- a/src/api/runtime_zero.rs +++ b/src/api/runtime_zero.rs @@ -2,7 +2,7 @@ use std::sync::atomic::Ordering; use serde::Serialize; -use crate::config::{MeFloorMode, ProxyConfig, UserMaxUniqueIpsMode}; +use crate::config::{MeFloorMode, MeWriterPickMode, ProxyConfig, UserMaxUniqueIpsMode}; use super::ApiShared; use super::runtime_init::build_runtime_startup_summary; @@ -78,6 +78,8 @@ pub(super) struct EffectiveMiddleProxyLimits { pub(super) reconnect_backoff_base_ms: u64, pub(super) reconnect_backoff_cap_ms: u64, pub(super) reconnect_fast_retry_count: u32, + pub(super) writer_pick_mode: &'static str, + pub(super) writer_pick_sample_size: u8, pub(super) me2dc_fallback: bool, } @@ -237,6 +239,8 @@ pub(super) fn build_limits_effective_data(cfg: &ProxyConfig) -> EffectiveLimitsD reconnect_backoff_base_ms: cfg.general.me_reconnect_backoff_base_ms, reconnect_backoff_cap_ms: cfg.general.me_reconnect_backoff_cap_ms, reconnect_fast_retry_count: cfg.general.me_reconnect_fast_retry_count, + writer_pick_mode: me_writer_pick_mode_label(cfg.general.me_writer_pick_mode), + writer_pick_sample_size: cfg.general.me_writer_pick_sample_size, me2dc_fallback: cfg.general.me2dc_fallback, }, user_ip_policy: EffectiveUserIpPolicyLimits { @@ -274,3 +278,10 @@ fn me_floor_mode_label(mode: MeFloorMode) -> &'static str { MeFloorMode::Adaptive => "adaptive", } } + +fn me_writer_pick_mode_label(mode: MeWriterPickMode) -> &'static str { + match mode { + MeWriterPickMode::SortedRr => "sorted_rr", + MeWriterPickMode::P2c => "p2c", + } +} diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 798d881..68dd71e 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -21,9 +21,10 @@ const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_ACTIVE_WRITERS_PER_CORE: u16 = 64; const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_WARM_WRITERS_PER_CORE: u16 = 64; const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_ACTIVE_WRITERS_GLOBAL: u32 = 256; const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_WARM_WRITERS_GLOBAL: u32 = 256; -const DEFAULT_ME_WRITER_CMD_CHANNEL_CAPACITY: usize = 512; +const DEFAULT_ME_WRITER_CMD_CHANNEL_CAPACITY: usize = 1024; const DEFAULT_ME_ROUTE_CHANNEL_CAPACITY: usize = 512; -const DEFAULT_ME_C2ME_CHANNEL_CAPACITY: usize = 128; +const DEFAULT_ME_C2ME_CHANNEL_CAPACITY: usize = 256; +const DEFAULT_ME_WRITER_PICK_SAMPLE_SIZE: u8 = 3; const DEFAULT_ME_HEALTH_INTERVAL_MS_UNHEALTHY: u64 = 1000; const DEFAULT_ME_HEALTH_INTERVAL_MS_HEALTHY: u64 = 3000; const DEFAULT_ME_ADMISSION_POLL_MS: u64 = 1000; @@ -315,6 +316,10 @@ pub(crate) fn default_me_c2me_channel_capacity() -> usize { DEFAULT_ME_C2ME_CHANNEL_CAPACITY } +pub(crate) fn default_me_writer_pick_sample_size() -> u8 { + DEFAULT_ME_WRITER_PICK_SAMPLE_SIZE +} + pub(crate) fn default_me_health_interval_ms_unhealthy() -> u64 { DEFAULT_ME_HEALTH_INTERVAL_MS_UNHEALTHY } diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index e7029a3..34b2d76 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -29,7 +29,10 @@ use notify::{EventKind, RecursiveMode, Watcher, recommended_watcher}; use tokio::sync::{mpsc, watch}; use tracing::{error, info, warn}; -use crate::config::{LogLevel, MeBindStaleMode, MeFloorMode, MeSocksKdfPolicy, MeTelemetryLevel}; +use crate::config::{ + LogLevel, MeBindStaleMode, MeFloorMode, MeSocksKdfPolicy, MeTelemetryLevel, + MeWriterPickMode, +}; use super::load::ProxyConfig; // ── Hot fields ──────────────────────────────────────────────────────────────── @@ -57,6 +60,8 @@ pub struct HotFields { pub me_bind_stale_ttl_secs: u64, pub me_secret_atomic_snapshot: bool, pub me_deterministic_writer_sort: bool, + pub me_writer_pick_mode: MeWriterPickMode, + pub me_writer_pick_sample_size: u8, pub me_single_endpoint_shadow_writers: u8, pub me_single_endpoint_outage_mode_enabled: bool, pub me_single_endpoint_outage_disable_quarantine: bool, @@ -130,6 +135,8 @@ impl HotFields { me_bind_stale_ttl_secs: cfg.general.me_bind_stale_ttl_secs, me_secret_atomic_snapshot: cfg.general.me_secret_atomic_snapshot, me_deterministic_writer_sort: cfg.general.me_deterministic_writer_sort, + me_writer_pick_mode: cfg.general.me_writer_pick_mode, + me_writer_pick_sample_size: cfg.general.me_writer_pick_sample_size, me_single_endpoint_shadow_writers: cfg.general.me_single_endpoint_shadow_writers, me_single_endpoint_outage_mode_enabled: cfg .general @@ -292,6 +299,8 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig { cfg.general.me_bind_stale_ttl_secs = new.general.me_bind_stale_ttl_secs; cfg.general.me_secret_atomic_snapshot = new.general.me_secret_atomic_snapshot; cfg.general.me_deterministic_writer_sort = new.general.me_deterministic_writer_sort; + cfg.general.me_writer_pick_mode = new.general.me_writer_pick_mode; + cfg.general.me_writer_pick_sample_size = new.general.me_writer_pick_sample_size; cfg.general.me_single_endpoint_shadow_writers = new.general.me_single_endpoint_shadow_writers; cfg.general.me_single_endpoint_outage_mode_enabled = new.general.me_single_endpoint_outage_mode_enabled; @@ -683,11 +692,15 @@ fn log_changes( } if old_hot.me_secret_atomic_snapshot != new_hot.me_secret_atomic_snapshot || old_hot.me_deterministic_writer_sort != new_hot.me_deterministic_writer_sort + || old_hot.me_writer_pick_mode != new_hot.me_writer_pick_mode + || old_hot.me_writer_pick_sample_size != new_hot.me_writer_pick_sample_size { info!( - "config reload: me_runtime_flags: secret_atomic_snapshot={} deterministic_sort={}", + "config reload: me_runtime_flags: secret_atomic_snapshot={} deterministic_sort={} writer_pick_mode={:?} writer_pick_sample_size={}", new_hot.me_secret_atomic_snapshot, - new_hot.me_deterministic_writer_sort + new_hot.me_deterministic_writer_sort, + new_hot.me_writer_pick_mode, + new_hot.me_writer_pick_sample_size, ); } if old_hot.me_single_endpoint_shadow_writers != new_hot.me_single_endpoint_shadow_writers diff --git a/src/config/load.rs b/src/config/load.rs index c013b1a..623ec4d 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -519,6 +519,12 @@ impl ProxyConfig { )); } + if !(2..=4).contains(&config.general.me_writer_pick_sample_size) { + return Err(ProxyError::Config( + "general.me_writer_pick_sample_size must be within [2, 4]".to_string(), + )); + } + if config.general.me_route_inline_recovery_attempts == 0 { return Err(ProxyError::Config( "general.me_route_inline_recovery_attempts must be > 0".to_string(), diff --git a/src/config/types.rs b/src/config/types.rs index b2be9cf..588c82f 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -212,6 +212,32 @@ impl MeRouteNoWriterMode { } } +/// Middle-End writer selection mode for new client bindings. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] +#[serde(rename_all = "snake_case")] +pub enum MeWriterPickMode { + SortedRr, + #[default] + P2c, +} + +impl MeWriterPickMode { + pub fn as_u8(self) -> u8 { + match self { + MeWriterPickMode::SortedRr => 0, + MeWriterPickMode::P2c => 1, + } + } + + pub fn from_u8(raw: u8) -> Self { + match raw { + 0 => MeWriterPickMode::SortedRr, + 1 => MeWriterPickMode::P2c, + _ => MeWriterPickMode::P2c, + } + } +} + /// Per-user unique source IP limit mode. #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] #[serde(rename_all = "snake_case")] @@ -782,6 +808,14 @@ pub struct GeneralConfig { #[serde(default = "default_me_deterministic_writer_sort")] pub me_deterministic_writer_sort: bool, + /// Writer selection mode for ME route bind path. + #[serde(default)] + pub me_writer_pick_mode: MeWriterPickMode, + + /// Number of candidates sampled by writer picker in `p2c` mode. + #[serde(default = "default_me_writer_pick_sample_size")] + pub me_writer_pick_sample_size: u8, + /// Enable NTP drift check at startup. #[serde(default = "default_ntp_check")] pub ntp_check: bool, @@ -912,6 +946,8 @@ impl Default for GeneralConfig { me_reinit_trigger_channel: default_me_reinit_trigger_channel(), me_reinit_coalesce_window_ms: default_me_reinit_coalesce_window_ms(), me_deterministic_writer_sort: default_me_deterministic_writer_sort(), + me_writer_pick_mode: MeWriterPickMode::default(), + me_writer_pick_sample_size: default_me_writer_pick_sample_size(), ntp_check: default_ntp_check(), ntp_servers: default_ntp_servers(), auto_degradation_enabled: default_true(), diff --git a/src/main.rs b/src/main.rs index c1059c3..d28fabe 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1047,6 +1047,8 @@ async fn main() -> std::result::Result<(), Box> { config.general.me_bind_stale_ttl_secs, config.general.me_secret_atomic_snapshot, config.general.me_deterministic_writer_sort, + config.general.me_writer_pick_mode, + config.general.me_writer_pick_sample_size, config.general.me_socks_kdf_policy, config.general.me_writer_cmd_channel_capacity, config.general.me_route_channel_capacity, diff --git a/src/metrics.rs b/src/metrics.rs index b338df5..917c9b3 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -689,6 +689,135 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp } ); + let _ = writeln!( + out, + "# HELP telemt_me_writer_pick_total ME writer-pick outcomes by mode and result" + ); + let _ = writeln!(out, "# TYPE telemt_me_writer_pick_total counter"); + let _ = writeln!( + out, + "telemt_me_writer_pick_total{{mode=\"sorted_rr\",result=\"success_try\"}} {}", + if me_allows_normal { + stats.get_me_writer_pick_sorted_rr_success_try_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_writer_pick_total{{mode=\"sorted_rr\",result=\"success_fallback\"}} {}", + if me_allows_normal { + stats.get_me_writer_pick_sorted_rr_success_fallback_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_writer_pick_total{{mode=\"sorted_rr\",result=\"full\"}} {}", + if me_allows_normal { + stats.get_me_writer_pick_sorted_rr_full_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_writer_pick_total{{mode=\"sorted_rr\",result=\"closed\"}} {}", + if me_allows_normal { + stats.get_me_writer_pick_sorted_rr_closed_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_writer_pick_total{{mode=\"sorted_rr\",result=\"no_candidate\"}} {}", + if me_allows_normal { + stats.get_me_writer_pick_sorted_rr_no_candidate_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_writer_pick_total{{mode=\"p2c\",result=\"success_try\"}} {}", + if me_allows_normal { + stats.get_me_writer_pick_p2c_success_try_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_writer_pick_total{{mode=\"p2c\",result=\"success_fallback\"}} {}", + if me_allows_normal { + stats.get_me_writer_pick_p2c_success_fallback_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_writer_pick_total{{mode=\"p2c\",result=\"full\"}} {}", + if me_allows_normal { + stats.get_me_writer_pick_p2c_full_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_writer_pick_total{{mode=\"p2c\",result=\"closed\"}} {}", + if me_allows_normal { + stats.get_me_writer_pick_p2c_closed_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_writer_pick_total{{mode=\"p2c\",result=\"no_candidate\"}} {}", + if me_allows_normal { + stats.get_me_writer_pick_p2c_no_candidate_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_writer_pick_blocking_fallback_total ME writer-pick blocking fallback attempts" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_writer_pick_blocking_fallback_total counter" + ); + let _ = writeln!( + out, + "telemt_me_writer_pick_blocking_fallback_total {}", + if me_allows_normal { + stats.get_me_writer_pick_blocking_fallback_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_writer_pick_mode_switch_total Writer-pick mode switches via runtime updates" + ); + let _ = writeln!(out, "# TYPE telemt_me_writer_pick_mode_switch_total counter"); + let _ = writeln!( + out, + "telemt_me_writer_pick_mode_switch_total {}", + if me_allows_normal { + stats.get_me_writer_pick_mode_switch_total() + } else { + 0 + } + ); + let _ = writeln!( out, "# HELP telemt_me_socks_kdf_policy_total SOCKS KDF policy outcomes" diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 10d8882..25905b2 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -16,7 +16,7 @@ use std::collections::hash_map::DefaultHasher; use std::collections::VecDeque; use tracing::debug; -use crate::config::MeTelemetryLevel; +use crate::config::{MeTelemetryLevel, MeWriterPickMode}; use self::telemetry::TelemetryPolicy; // ============= Stats ============= @@ -95,6 +95,18 @@ pub struct Stats { me_route_drop_queue_full: AtomicU64, me_route_drop_queue_full_base: AtomicU64, me_route_drop_queue_full_high: AtomicU64, + me_writer_pick_sorted_rr_success_try_total: AtomicU64, + me_writer_pick_sorted_rr_success_fallback_total: AtomicU64, + me_writer_pick_sorted_rr_full_total: AtomicU64, + me_writer_pick_sorted_rr_closed_total: AtomicU64, + me_writer_pick_sorted_rr_no_candidate_total: AtomicU64, + me_writer_pick_p2c_success_try_total: AtomicU64, + me_writer_pick_p2c_success_fallback_total: AtomicU64, + me_writer_pick_p2c_full_total: AtomicU64, + me_writer_pick_p2c_closed_total: AtomicU64, + me_writer_pick_p2c_no_candidate_total: AtomicU64, + me_writer_pick_blocking_fallback_total: AtomicU64, + me_writer_pick_mode_switch_total: AtomicU64, me_socks_kdf_strict_reject: AtomicU64, me_socks_kdf_compat_fallback: AtomicU64, secure_padding_invalid: AtomicU64, @@ -497,6 +509,93 @@ impl Stats { self.me_route_drop_queue_full_high.fetch_add(1, Ordering::Relaxed); } } + pub fn increment_me_writer_pick_success_try_total(&self, mode: MeWriterPickMode) { + if !self.telemetry_me_allows_normal() { + return; + } + match mode { + MeWriterPickMode::SortedRr => { + self.me_writer_pick_sorted_rr_success_try_total + .fetch_add(1, Ordering::Relaxed); + } + MeWriterPickMode::P2c => { + self.me_writer_pick_p2c_success_try_total + .fetch_add(1, Ordering::Relaxed); + } + } + } + pub fn increment_me_writer_pick_success_fallback_total(&self, mode: MeWriterPickMode) { + if !self.telemetry_me_allows_normal() { + return; + } + match mode { + MeWriterPickMode::SortedRr => { + self.me_writer_pick_sorted_rr_success_fallback_total + .fetch_add(1, Ordering::Relaxed); + } + MeWriterPickMode::P2c => { + self.me_writer_pick_p2c_success_fallback_total + .fetch_add(1, Ordering::Relaxed); + } + } + } + pub fn increment_me_writer_pick_full_total(&self, mode: MeWriterPickMode) { + if !self.telemetry_me_allows_normal() { + return; + } + match mode { + MeWriterPickMode::SortedRr => { + self.me_writer_pick_sorted_rr_full_total + .fetch_add(1, Ordering::Relaxed); + } + MeWriterPickMode::P2c => { + self.me_writer_pick_p2c_full_total + .fetch_add(1, Ordering::Relaxed); + } + } + } + pub fn increment_me_writer_pick_closed_total(&self, mode: MeWriterPickMode) { + if !self.telemetry_me_allows_normal() { + return; + } + match mode { + MeWriterPickMode::SortedRr => { + self.me_writer_pick_sorted_rr_closed_total + .fetch_add(1, Ordering::Relaxed); + } + MeWriterPickMode::P2c => { + self.me_writer_pick_p2c_closed_total + .fetch_add(1, Ordering::Relaxed); + } + } + } + pub fn increment_me_writer_pick_no_candidate_total(&self, mode: MeWriterPickMode) { + if !self.telemetry_me_allows_normal() { + return; + } + match mode { + MeWriterPickMode::SortedRr => { + self.me_writer_pick_sorted_rr_no_candidate_total + .fetch_add(1, Ordering::Relaxed); + } + MeWriterPickMode::P2c => { + self.me_writer_pick_p2c_no_candidate_total + .fetch_add(1, Ordering::Relaxed); + } + } + } + pub fn increment_me_writer_pick_blocking_fallback_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_writer_pick_blocking_fallback_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_writer_pick_mode_switch_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_writer_pick_mode_switch_total + .fetch_add(1, Ordering::Relaxed); + } + } pub fn increment_me_socks_kdf_strict_reject(&self) { if self.telemetry_me_allows_normal() { self.me_socks_kdf_strict_reject.fetch_add(1, Ordering::Relaxed); @@ -1001,6 +1100,52 @@ impl Stats { pub fn get_me_route_drop_queue_full_high(&self) -> u64 { self.me_route_drop_queue_full_high.load(Ordering::Relaxed) } + pub fn get_me_writer_pick_sorted_rr_success_try_total(&self) -> u64 { + self.me_writer_pick_sorted_rr_success_try_total + .load(Ordering::Relaxed) + } + pub fn get_me_writer_pick_sorted_rr_success_fallback_total(&self) -> u64 { + self.me_writer_pick_sorted_rr_success_fallback_total + .load(Ordering::Relaxed) + } + pub fn get_me_writer_pick_sorted_rr_full_total(&self) -> u64 { + self.me_writer_pick_sorted_rr_full_total + .load(Ordering::Relaxed) + } + pub fn get_me_writer_pick_sorted_rr_closed_total(&self) -> u64 { + self.me_writer_pick_sorted_rr_closed_total + .load(Ordering::Relaxed) + } + pub fn get_me_writer_pick_sorted_rr_no_candidate_total(&self) -> u64 { + self.me_writer_pick_sorted_rr_no_candidate_total + .load(Ordering::Relaxed) + } + pub fn get_me_writer_pick_p2c_success_try_total(&self) -> u64 { + self.me_writer_pick_p2c_success_try_total + .load(Ordering::Relaxed) + } + pub fn get_me_writer_pick_p2c_success_fallback_total(&self) -> u64 { + self.me_writer_pick_p2c_success_fallback_total + .load(Ordering::Relaxed) + } + pub fn get_me_writer_pick_p2c_full_total(&self) -> u64 { + self.me_writer_pick_p2c_full_total.load(Ordering::Relaxed) + } + pub fn get_me_writer_pick_p2c_closed_total(&self) -> u64 { + self.me_writer_pick_p2c_closed_total.load(Ordering::Relaxed) + } + pub fn get_me_writer_pick_p2c_no_candidate_total(&self) -> u64 { + self.me_writer_pick_p2c_no_candidate_total + .load(Ordering::Relaxed) + } + pub fn get_me_writer_pick_blocking_fallback_total(&self) -> u64 { + self.me_writer_pick_blocking_fallback_total + .load(Ordering::Relaxed) + } + pub fn get_me_writer_pick_mode_switch_total(&self) -> u64 { + self.me_writer_pick_mode_switch_total + .load(Ordering::Relaxed) + } pub fn get_me_socks_kdf_strict_reject(&self) -> u64 { self.me_socks_kdf_strict_reject.load(Ordering::Relaxed) } diff --git a/src/transport/middle_proxy/config_updater.rs b/src/transport/middle_proxy/config_updater.rs index 1bcda14..2c6a07a 100644 --- a/src/transport/middle_proxy/config_updater.rs +++ b/src/transport/middle_proxy/config_updater.rs @@ -306,6 +306,8 @@ async fn run_update_cycle( cfg.general.me_bind_stale_ttl_secs, cfg.general.me_secret_atomic_snapshot, cfg.general.me_deterministic_writer_sort, + cfg.general.me_writer_pick_mode, + cfg.general.me_writer_pick_sample_size, cfg.general.me_single_endpoint_shadow_writers, cfg.general.me_single_endpoint_outage_mode_enabled, cfg.general.me_single_endpoint_outage_disable_quarantine, @@ -530,6 +532,8 @@ pub async fn me_config_updater( cfg.general.me_bind_stale_ttl_secs, cfg.general.me_secret_atomic_snapshot, cfg.general.me_deterministic_writer_sort, + cfg.general.me_writer_pick_mode, + cfg.general.me_writer_pick_sample_size, cfg.general.me_single_endpoint_shadow_writers, cfg.general.me_single_endpoint_outage_mode_enabled, cfg.general.me_single_endpoint_outage_disable_quarantine, diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 0c9c30c..07ad67b 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -7,7 +7,9 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use tokio::sync::{Mutex, Notify, RwLock, mpsc}; use tokio_util::sync::CancellationToken; -use crate::config::{MeBindStaleMode, MeFloorMode, MeRouteNoWriterMode, MeSocksKdfPolicy}; +use crate::config::{ + MeBindStaleMode, MeFloorMode, MeRouteNoWriterMode, MeSocksKdfPolicy, MeWriterPickMode, +}; use crate::crypto::SecureRandom; use crate::network::IpFamily; use crate::network::probe::NetworkDecision; @@ -39,6 +41,7 @@ pub struct MeWriter { pub tx: mpsc::Sender, pub cancel: CancellationToken, pub degraded: Arc, + pub rtt_ema_ms_x10: Arc, pub draining: Arc, pub draining_started_at_epoch_secs: Arc, pub drain_deadline_epoch_secs: Arc, @@ -177,6 +180,8 @@ pub struct MePool { pub(super) me_bind_stale_ttl_secs: AtomicU64, pub(super) secret_atomic_snapshot: AtomicBool, pub(super) me_deterministic_writer_sort: AtomicBool, + pub(super) me_writer_pick_mode: AtomicU8, + pub(super) me_writer_pick_sample_size: AtomicU8, pub(super) me_socks_kdf_policy: AtomicU8, pub(super) me_route_no_writer_mode: AtomicU8, pub(super) me_route_no_writer_wait: Duration, @@ -274,6 +279,8 @@ impl MePool { me_bind_stale_ttl_secs: u64, me_secret_atomic_snapshot: bool, me_deterministic_writer_sort: bool, + me_writer_pick_mode: MeWriterPickMode, + me_writer_pick_sample_size: u8, me_socks_kdf_policy: MeSocksKdfPolicy, me_writer_cmd_channel_capacity: usize, me_route_channel_capacity: usize, @@ -450,6 +457,8 @@ impl MePool { me_bind_stale_ttl_secs: AtomicU64::new(me_bind_stale_ttl_secs), secret_atomic_snapshot: AtomicBool::new(me_secret_atomic_snapshot), me_deterministic_writer_sort: AtomicBool::new(me_deterministic_writer_sort), + me_writer_pick_mode: AtomicU8::new(me_writer_pick_mode.as_u8()), + me_writer_pick_sample_size: AtomicU8::new(me_writer_pick_sample_size.clamp(2, 4)), me_socks_kdf_policy: AtomicU8::new(me_socks_kdf_policy.as_u8()), me_route_no_writer_mode: AtomicU8::new(me_route_no_writer_mode.as_u8()), me_route_no_writer_wait: Duration::from_millis(me_route_no_writer_wait_ms), @@ -489,6 +498,8 @@ impl MePool { bind_stale_ttl_secs: u64, secret_atomic_snapshot: bool, deterministic_writer_sort: bool, + writer_pick_mode: MeWriterPickMode, + writer_pick_sample_size: u8, single_endpoint_shadow_writers: u8, single_endpoint_outage_mode_enabled: bool, single_endpoint_outage_disable_quarantine: bool, @@ -535,6 +546,14 @@ impl MePool { .store(secret_atomic_snapshot, Ordering::Relaxed); self.me_deterministic_writer_sort .store(deterministic_writer_sort, Ordering::Relaxed); + let previous_writer_pick_mode = self.writer_pick_mode(); + self.me_writer_pick_mode + .store(writer_pick_mode.as_u8(), Ordering::Relaxed); + self.me_writer_pick_sample_size + .store(writer_pick_sample_size.clamp(2, 4), Ordering::Relaxed); + if previous_writer_pick_mode != writer_pick_mode { + self.stats.increment_me_writer_pick_mode_switch_total(); + } self.me_single_endpoint_shadow_writers .store(single_endpoint_shadow_writers, Ordering::Relaxed); self.me_single_endpoint_outage_mode_enabled @@ -692,6 +711,16 @@ impl MePool { MeBindStaleMode::from_u8(self.me_bind_stale_mode.load(Ordering::Relaxed)) } + pub(super) fn writer_pick_mode(&self) -> MeWriterPickMode { + MeWriterPickMode::from_u8(self.me_writer_pick_mode.load(Ordering::Relaxed)) + } + + pub(super) fn writer_pick_sample_size(&self) -> usize { + self.me_writer_pick_sample_size + .load(Ordering::Relaxed) + .clamp(2, 4) as usize + } + pub(super) fn required_writers_for_dc(&self, endpoint_count: usize) -> usize { if endpoint_count == 0 { return 0; diff --git a/src/transport/middle_proxy/pool_refill.rs b/src/transport/middle_proxy/pool_refill.rs index 544d048..fc916f4 100644 --- a/src/transport/middle_proxy/pool_refill.rs +++ b/src/transport/middle_proxy/pool_refill.rs @@ -1,4 +1,4 @@ -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::net::SocketAddr; use std::sync::Arc; use std::sync::atomic::Ordering; @@ -113,10 +113,35 @@ impl MePool { contour: WriterContour, allow_coverage_override: bool, ) -> bool { - let candidates = self.connectable_endpoints(endpoints).await; + let mut candidates = self.connectable_endpoints(endpoints).await; if candidates.is_empty() { return false; } + if candidates.len() > 1 { + let mut active_by_endpoint = HashMap::::new(); + let ws = self.writers.read().await; + for writer in ws.iter() { + if writer.draining.load(Ordering::Relaxed) { + continue; + } + if writer.writer_dc != dc { + continue; + } + if !matches!( + super::pool::WriterContour::from_u8( + writer.contour.load(Ordering::Relaxed), + ), + super::pool::WriterContour::Active + ) { + continue; + } + if candidates.contains(&writer.addr) { + *active_by_endpoint.entry(writer.addr).or_insert(0) += 1; + } + } + drop(ws); + candidates.sort_by_key(|addr| (active_by_endpoint.get(addr).copied().unwrap_or(0), *addr)); + } let start = (self.rr.fetch_add(1, Ordering::Relaxed) as usize) % candidates.len(); for offset in 0..candidates.len() { let idx = (start + offset) % candidates.len(); diff --git a/src/transport/middle_proxy/pool_status.rs b/src/transport/middle_proxy/pool_status.rs index cc1be5b..6673cf2 100644 --- a/src/transport/middle_proxy/pool_status.rs +++ b/src/transport/middle_proxy/pool_status.rs @@ -25,6 +25,7 @@ pub(crate) struct MeApiWriterStatusSnapshot { pub(crate) struct MeApiDcStatusSnapshot { pub dc: i16, pub endpoints: Vec, + pub endpoint_writers: Vec, pub available_endpoints: usize, pub available_pct: f64, pub required_writers: usize, @@ -38,6 +39,12 @@ pub(crate) struct MeApiDcStatusSnapshot { pub load: usize, } +#[derive(Clone, Debug)] +pub(crate) struct MeApiDcEndpointWriterSnapshot { + pub endpoint: SocketAddr, + pub active_writers: usize, +} + #[derive(Clone, Debug)] pub(crate) struct MeApiStatusSnapshot { pub generated_at_epoch_secs: u64, @@ -118,6 +125,8 @@ pub(crate) struct MeApiRuntimeSnapshot { pub me_single_endpoint_outage_backoff_max_ms: u64, pub me_single_endpoint_shadow_rotate_every_secs: u64, pub me_deterministic_writer_sort: bool, + pub me_writer_pick_mode: &'static str, + pub me_writer_pick_sample_size: u8, pub me_socks_kdf_policy: &'static str, pub quarantined_endpoints: Vec, pub network_path: Vec, @@ -338,6 +347,16 @@ impl MePool { dcs.push(MeApiDcStatusSnapshot { dc, + endpoint_writers: endpoints + .iter() + .map(|endpoint| MeApiDcEndpointWriterSnapshot { + endpoint: *endpoint, + active_writers: live_writers_by_dc_endpoint + .get(&(dc, *endpoint)) + .copied() + .unwrap_or(0), + }) + .collect(), endpoints: endpoints.into_iter().collect(), available_endpoints: dc_available_endpoints, available_pct: ratio_pct(dc_available_endpoints, endpoint_count), @@ -522,6 +541,8 @@ impl MePool { me_deterministic_writer_sort: self .me_deterministic_writer_sort .load(Ordering::Relaxed), + me_writer_pick_mode: writer_pick_mode_label(self.writer_pick_mode()), + me_writer_pick_sample_size: self.writer_pick_sample_size() as u8, me_socks_kdf_policy: socks_kdf_policy_label(self.socks_kdf_policy()), quarantined_endpoints, network_path, @@ -570,6 +591,13 @@ fn bind_stale_mode_label(mode: MeBindStaleMode) -> &'static str { } } +fn writer_pick_mode_label(mode: crate::config::MeWriterPickMode) -> &'static str { + match mode { + crate::config::MeWriterPickMode::SortedRr => "sorted_rr", + crate::config::MeWriterPickMode::P2c => "p2c", + } +} + fn socks_kdf_policy_label(policy: MeSocksKdfPolicy) -> &'static str { match policy { MeSocksKdfPolicy::Strict => "strict", diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index 036572a..43abf0c 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -1,6 +1,6 @@ use std::net::SocketAddr; use std::sync::Arc; -use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, Ordering}; use std::time::{Duration, Instant}; use std::io::ErrorKind; @@ -128,6 +128,7 @@ impl MePool { let contour = Arc::new(AtomicU8::new(contour.as_u8())); let cancel = CancellationToken::new(); let degraded = Arc::new(AtomicBool::new(false)); + let rtt_ema_ms_x10 = Arc::new(AtomicU32::new(0)); let draining = Arc::new(AtomicBool::new(false)); let draining_started_at_epoch_secs = Arc::new(AtomicU64::new(0)); let drain_deadline_epoch_secs = Arc::new(AtomicU64::new(0)); @@ -169,6 +170,7 @@ impl MePool { tx: tx.clone(), cancel: cancel.clone(), degraded: degraded.clone(), + rtt_ema_ms_x10: rtt_ema_ms_x10.clone(), draining: draining.clone(), draining_started_at_epoch_secs: draining_started_at_epoch_secs.clone(), drain_deadline_epoch_secs: drain_deadline_epoch_secs.clone(), @@ -222,6 +224,7 @@ impl MePool { stats_reader, writer_id, degraded.clone(), + rtt_ema_ms_x10.clone(), cancel_reader_token.clone(), ) .await; diff --git a/src/transport/middle_proxy/reader.rs b/src/transport/middle_proxy/reader.rs index 61bd69c..32de774 100644 --- a/src/transport/middle_proxy/reader.rs +++ b/src/transport/middle_proxy/reader.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::io::ErrorKind; use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::time::Instant; use bytes::{Bytes, BytesMut}; @@ -34,6 +34,7 @@ pub(crate) async fn reader_loop( stats: Arc, _writer_id: u64, degraded: Arc, + writer_rtt_ema_ms_x10: Arc, cancel: CancellationToken, ) -> Result<()> { let mut raw = enc_leftover; @@ -208,6 +209,8 @@ pub(crate) async fn reader_loop( } let degraded_now = entry.1 > entry.0 * 2.0; degraded.store(degraded_now, Ordering::Relaxed); + writer_rtt_ema_ms_x10 + .store((entry.1 * 10.0).clamp(0.0, u32::MAX as f64) as u32, Ordering::Relaxed); trace!(writer_id = wid, rtt_ms = rtt, ema_ms = entry.1, base_ms = entry.0, degraded = degraded_now, "ME RTT sample"); } } else { diff --git a/src/transport/middle_proxy/send.rs b/src/transport/middle_proxy/send.rs index ec199fd..79cfa54 100644 --- a/src/transport/middle_proxy/send.rs +++ b/src/transport/middle_proxy/send.rs @@ -9,7 +9,7 @@ use bytes::Bytes; use tokio::sync::mpsc::error::TrySendError; use tracing::{debug, warn}; -use crate::config::MeRouteNoWriterMode; +use crate::config::{MeRouteNoWriterMode, MeWriterPickMode}; use crate::error::{ProxyError, Result}; use crate::network::IpFamily; use crate::protocol::constants::{RPC_CLOSE_CONN_U32, RPC_CLOSE_EXT_U32}; @@ -24,6 +24,10 @@ use super::registry::ConnMeta; const IDLE_WRITER_PENALTY_MID_SECS: u64 = 45; const IDLE_WRITER_PENALTY_HIGH_SECS: u64 = 55; const HYBRID_GLOBAL_BURST_PERIOD_ROUNDS: u32 = 4; +const PICK_PENALTY_WARM: u64 = 200; +const PICK_PENALTY_DRAINING: u64 = 600; +const PICK_PENALTY_STALE: u64 = 300; +const PICK_PENALTY_DEGRADED: u64 = 250; impl MePool { /// Send RPC_PROXY_REQ. `tag_override`: per-user ad_tag (from access.user_ad_tags); if None, uses pool default. @@ -181,6 +185,7 @@ impl MePool { .await; } if candidate_indices.is_empty() { + let pick_mode = self.writer_pick_mode(); match no_writer_mode { MeRouteNoWriterMode::AsyncRecoveryFailfast => { let deadline = *no_writer_deadline.get_or_insert_with(|| { @@ -196,6 +201,7 @@ impl MePool { if self.wait_for_candidate_until(routed_dc, deadline).await { continue; } + self.stats.increment_me_writer_pick_no_candidate_total(pick_mode); self.stats.increment_me_no_writer_failfast_total(); return Err(ProxyError::Proxy( "No ME writers available for target DC in failfast window".into(), @@ -209,10 +215,12 @@ impl MePool { if self.wait_for_candidate_until(routed_dc, deadline).await { continue; } + self.stats.increment_me_writer_pick_no_candidate_total(pick_mode); self.stats.increment_me_no_writer_failfast_total(); return Err(ProxyError::Proxy("No ME writers available for target DC".into())); } if emergency_attempts >= self.me_route_inline_recovery_attempts.max(1) { + self.stats.increment_me_writer_pick_no_candidate_total(pick_mode); self.stats.increment_me_no_writer_failfast_total(); return Err(ProxyError::Proxy("No ME writers available for target DC".into())); } @@ -237,6 +245,7 @@ impl MePool { .await; } if candidate_indices.is_empty() { + self.stats.increment_me_writer_pick_no_candidate_total(pick_mode); return Err(ProxyError::Proxy("No ME writers available for target DC".into())); } } @@ -259,6 +268,8 @@ impl MePool { } } hybrid_wait_current = hybrid_wait_step; + let pick_mode = self.writer_pick_mode(); + let pick_sample_size = self.writer_pick_sample_size(); let writer_ids: Vec = candidate_indices .iter() .map(|idx| writers_snapshot[*idx].id) @@ -268,69 +279,84 @@ impl MePool { .writer_idle_since_for_writer_ids(&writer_ids) .await; let now_epoch_secs = Self::now_epoch_secs(); - - if self.me_deterministic_writer_sort.load(Ordering::Relaxed) { - candidate_indices.sort_by(|lhs, rhs| { - let left = &writers_snapshot[*lhs]; - let right = &writers_snapshot[*rhs]; - let left_key = ( - self.writer_contour_rank_for_selection(left), - (left.generation < self.current_generation()) as usize, - left.degraded.load(Ordering::Relaxed) as usize, - self.writer_idle_rank_for_selection( - left, - &writer_idle_since, - now_epoch_secs, - ), - Reverse(left.tx.capacity()), - left.addr, - left.id, - ); - let right_key = ( - self.writer_contour_rank_for_selection(right), - (right.generation < self.current_generation()) as usize, - right.degraded.load(Ordering::Relaxed) as usize, - self.writer_idle_rank_for_selection( - right, - &writer_idle_since, - now_epoch_secs, - ), - Reverse(right.tx.capacity()), - right.addr, - right.id, - ); - left_key.cmp(&right_key) - }); - } else { - candidate_indices.sort_by_key(|idx| { - let w = &writers_snapshot[*idx]; - let degraded = w.degraded.load(Ordering::Relaxed); - let stale = (w.generation < self.current_generation()) as usize; - ( - self.writer_contour_rank_for_selection(w), - stale, - degraded as usize, - self.writer_idle_rank_for_selection( - w, - &writer_idle_since, - now_epoch_secs, - ), - Reverse(w.tx.capacity()), - ) - }); - } - let start = self.rr.fetch_add(1, Ordering::Relaxed) as usize % candidate_indices.len(); + let ordered_candidate_indices = if pick_mode == MeWriterPickMode::P2c { + self.p2c_ordered_candidate_indices( + &candidate_indices, + &writers_snapshot, + &writer_idle_since, + now_epoch_secs, + start, + pick_sample_size, + ) + } else { + if self.me_deterministic_writer_sort.load(Ordering::Relaxed) { + candidate_indices.sort_by(|lhs, rhs| { + let left = &writers_snapshot[*lhs]; + let right = &writers_snapshot[*rhs]; + let left_key = ( + self.writer_contour_rank_for_selection(left), + (left.generation < self.current_generation()) as usize, + left.degraded.load(Ordering::Relaxed) as usize, + self.writer_idle_rank_for_selection( + left, + &writer_idle_since, + now_epoch_secs, + ), + Reverse(left.tx.capacity()), + left.addr, + left.id, + ); + let right_key = ( + self.writer_contour_rank_for_selection(right), + (right.generation < self.current_generation()) as usize, + right.degraded.load(Ordering::Relaxed) as usize, + self.writer_idle_rank_for_selection( + right, + &writer_idle_since, + now_epoch_secs, + ), + Reverse(right.tx.capacity()), + right.addr, + right.id, + ); + left_key.cmp(&right_key) + }); + } else { + candidate_indices.sort_by_key(|idx| { + let w = &writers_snapshot[*idx]; + let degraded = w.degraded.load(Ordering::Relaxed); + let stale = (w.generation < self.current_generation()) as usize; + ( + self.writer_contour_rank_for_selection(w), + stale, + degraded as usize, + self.writer_idle_rank_for_selection( + w, + &writer_idle_since, + now_epoch_secs, + ), + Reverse(w.tx.capacity()), + ) + }); + } + + let mut ordered = Vec::::with_capacity(candidate_indices.len()); + for offset in 0..candidate_indices.len() { + ordered.push(candidate_indices[(start + offset) % candidate_indices.len()]); + } + ordered + }; let mut fallback_blocking_idx: Option = None; - for offset in 0..candidate_indices.len() { - let idx = candidate_indices[(start + offset) % candidate_indices.len()]; + for idx in ordered_candidate_indices { let w = &writers_snapshot[idx]; if !self.writer_accepts_new_binding(w) { continue; } match w.tx.try_send(WriterCommand::Data(payload.clone())) { Ok(()) => { + self.stats.increment_me_writer_pick_success_try_total(pick_mode); self.registry .bind_writer(conn_id, w.id, w.tx.clone(), meta.clone()) .await; @@ -352,6 +378,7 @@ impl MePool { } } Err(TrySendError::Closed(_)) => { + self.stats.increment_me_writer_pick_closed_total(pick_mode); warn!(writer_id = w.id, "ME writer channel closed"); self.remove_writer_and_close_clients(w.id).await; continue; @@ -360,15 +387,20 @@ impl MePool { } let Some(blocking_idx) = fallback_blocking_idx else { + self.stats.increment_me_writer_pick_full_total(pick_mode); continue; }; let w = writers_snapshot[blocking_idx].clone(); if !self.writer_accepts_new_binding(&w) { + self.stats.increment_me_writer_pick_full_total(pick_mode); continue; } + self.stats.increment_me_writer_pick_blocking_fallback_total(); match w.tx.send(WriterCommand::Data(payload.clone())).await { Ok(()) => { + self.stats + .increment_me_writer_pick_success_fallback_total(pick_mode); self.registry .bind_writer(conn_id, w.id, w.tx.clone(), meta.clone()) .await; @@ -378,6 +410,7 @@ impl MePool { return Ok(()); } Err(_) => { + self.stats.increment_me_writer_pick_closed_total(pick_mode); warn!(writer_id = w.id, "ME writer channel closed (blocking)"); self.remove_writer_and_close_clients(w.id).await; } @@ -626,4 +659,87 @@ impl MePool { 0 } } + + fn writer_pick_score( + &self, + writer: &super::pool::MeWriter, + idle_since_by_writer: &HashMap, + now_epoch_secs: u64, + ) -> u64 { + let contour_penalty = match WriterContour::from_u8(writer.contour.load(Ordering::Relaxed)) { + WriterContour::Active => 0, + WriterContour::Warm => PICK_PENALTY_WARM, + WriterContour::Draining => PICK_PENALTY_DRAINING, + }; + let stale_penalty = if writer.generation < self.current_generation() { + PICK_PENALTY_STALE + } else { + 0 + }; + let degraded_penalty = if writer.degraded.load(Ordering::Relaxed) { + PICK_PENALTY_DEGRADED + } else { + 0 + }; + let idle_penalty = + (self.writer_idle_rank_for_selection(writer, idle_since_by_writer, now_epoch_secs) as u64) + * 100; + let queue_cap = self.writer_cmd_channel_capacity.max(1) as u64; + let queue_remaining = writer.tx.capacity() as u64; + let queue_used = queue_cap.saturating_sub(queue_remaining.min(queue_cap)); + let queue_util_pct = queue_used.saturating_mul(100) / queue_cap; + let queue_penalty = queue_util_pct.saturating_mul(4); + let rtt_penalty = ((writer.rtt_ema_ms_x10.load(Ordering::Relaxed) as u64).saturating_add(5) / 10) + .min(400); + + contour_penalty + .saturating_add(stale_penalty) + .saturating_add(degraded_penalty) + .saturating_add(idle_penalty) + .saturating_add(queue_penalty) + .saturating_add(rtt_penalty) + } + + fn p2c_ordered_candidate_indices( + &self, + candidate_indices: &[usize], + writers_snapshot: &[super::pool::MeWriter], + idle_since_by_writer: &HashMap, + now_epoch_secs: u64, + start: usize, + sample_size: usize, + ) -> Vec { + let total = candidate_indices.len(); + if total == 0 { + return Vec::new(); + } + + let mut sampled = Vec::::with_capacity(sample_size.min(total)); + let mut seen = HashSet::::with_capacity(total); + for offset in 0..sample_size.min(total) { + let idx = candidate_indices[(start + offset) % total]; + if seen.insert(idx) { + sampled.push(idx); + } + } + + sampled.sort_by_key(|idx| { + let writer = &writers_snapshot[*idx]; + ( + self.writer_pick_score(writer, idle_since_by_writer, now_epoch_secs), + writer.addr, + writer.id, + ) + }); + + let mut ordered = Vec::::with_capacity(total); + ordered.extend(sampled.iter().copied()); + for offset in 0..total { + let idx = candidate_indices[(start + offset) % total]; + if seen.insert(idx) { + ordered.push(idx); + } + } + ordered + } }