ME Writer Pick + Active-by-Endpoint: merge pull request #369 from telemt/flow-pick

ME Writer Pick + Active-by-Endpoint
This commit is contained in:
Alexey 2026-03-08 03:07:38 +03:00 committed by GitHub
commit 4fd22b3219
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 645 additions and 71 deletions

View File

@ -266,6 +266,7 @@ pub(super) struct MeWritersData {
pub(super) struct DcStatus { pub(super) struct DcStatus {
pub(super) dc: i16, pub(super) dc: i16,
pub(super) endpoints: Vec<String>, pub(super) endpoints: Vec<String>,
pub(super) endpoint_writers: Vec<DcEndpointWriters>,
pub(super) available_endpoints: usize, pub(super) available_endpoints: usize,
pub(super) available_pct: f64, pub(super) available_pct: f64,
pub(super) required_writers: usize, pub(super) required_writers: usize,
@ -279,6 +280,12 @@ pub(super) struct DcStatus {
pub(super) load: usize, pub(super) load: usize,
} }
#[derive(Serialize, Clone)]
pub(super) struct DcEndpointWriters {
pub(super) endpoint: String,
pub(super) active_writers: usize,
}
#[derive(Serialize, Clone)] #[derive(Serialize, Clone)]
pub(super) struct DcStatusData { pub(super) struct DcStatusData {
pub(super) middle_proxy_enabled: bool, pub(super) middle_proxy_enabled: bool,
@ -354,6 +361,8 @@ pub(super) struct MinimalMeRuntimeData {
pub(super) me_single_endpoint_outage_backoff_max_ms: u64, pub(super) me_single_endpoint_outage_backoff_max_ms: u64,
pub(super) me_single_endpoint_shadow_rotate_every_secs: u64, pub(super) me_single_endpoint_shadow_rotate_every_secs: u64,
pub(super) me_deterministic_writer_sort: bool, pub(super) me_deterministic_writer_sort: bool,
pub(super) me_writer_pick_mode: &'static str,
pub(super) me_writer_pick_sample_size: u8,
pub(super) me_socks_kdf_policy: &'static str, pub(super) me_socks_kdf_policy: &'static str,
pub(super) quarantined_endpoints_total: usize, pub(super) quarantined_endpoints_total: usize,
pub(super) quarantined_endpoints: Vec<MinimalQuarantineData>, pub(super) quarantined_endpoints: Vec<MinimalQuarantineData>,

View File

@ -7,10 +7,10 @@ use crate::transport::UpstreamRouteKind;
use super::ApiShared; use super::ApiShared;
use super::model::{ use super::model::{
DcStatus, DcStatusData, MeWriterStatus, MeWritersData, MeWritersSummary, MinimalAllData, DcEndpointWriters, DcStatus, DcStatusData, MeWriterStatus, MeWritersData, MeWritersSummary,
MinimalAllPayload, MinimalDcPathData, MinimalMeRuntimeData, MinimalQuarantineData, MinimalAllData, MinimalAllPayload, MinimalDcPathData, MinimalMeRuntimeData,
UpstreamDcStatus, UpstreamStatus, UpstreamSummaryData, UpstreamsData, ZeroAllData, MinimalQuarantineData, UpstreamDcStatus, UpstreamStatus, UpstreamSummaryData, UpstreamsData,
ZeroCodeCount, ZeroCoreData, ZeroDesyncData, ZeroMiddleProxyData, ZeroPoolData, ZeroAllData, ZeroCodeCount, ZeroCoreData, ZeroDesyncData, ZeroMiddleProxyData, ZeroPoolData,
ZeroUpstreamData, ZeroUpstreamData,
}; };
@ -346,6 +346,14 @@ async fn get_minimal_payload_cached(
.into_iter() .into_iter()
.map(|value| value.to_string()) .map(|value| value.to_string())
.collect(), .collect(),
endpoint_writers: entry
.endpoint_writers
.into_iter()
.map(|coverage| DcEndpointWriters {
endpoint: coverage.endpoint.to_string(),
active_writers: coverage.active_writers,
})
.collect(),
available_endpoints: entry.available_endpoints, available_endpoints: entry.available_endpoints,
available_pct: entry.available_pct, available_pct: entry.available_pct,
required_writers: entry.required_writers, required_writers: entry.required_writers,
@ -422,6 +430,8 @@ async fn get_minimal_payload_cached(
me_single_endpoint_shadow_rotate_every_secs: runtime me_single_endpoint_shadow_rotate_every_secs: runtime
.me_single_endpoint_shadow_rotate_every_secs, .me_single_endpoint_shadow_rotate_every_secs,
me_deterministic_writer_sort: runtime.me_deterministic_writer_sort, me_deterministic_writer_sort: runtime.me_deterministic_writer_sort,
me_writer_pick_mode: runtime.me_writer_pick_mode,
me_writer_pick_sample_size: runtime.me_writer_pick_sample_size,
me_socks_kdf_policy: runtime.me_socks_kdf_policy, me_socks_kdf_policy: runtime.me_socks_kdf_policy,
quarantined_endpoints_total: runtime.quarantined_endpoints.len(), quarantined_endpoints_total: runtime.quarantined_endpoints.len(),
quarantined_endpoints: runtime quarantined_endpoints: runtime

View File

@ -2,7 +2,7 @@ use std::sync::atomic::Ordering;
use serde::Serialize; use serde::Serialize;
use crate::config::{MeFloorMode, ProxyConfig, UserMaxUniqueIpsMode}; use crate::config::{MeFloorMode, MeWriterPickMode, ProxyConfig, UserMaxUniqueIpsMode};
use super::ApiShared; use super::ApiShared;
use super::runtime_init::build_runtime_startup_summary; use super::runtime_init::build_runtime_startup_summary;
@ -78,6 +78,8 @@ pub(super) struct EffectiveMiddleProxyLimits {
pub(super) reconnect_backoff_base_ms: u64, pub(super) reconnect_backoff_base_ms: u64,
pub(super) reconnect_backoff_cap_ms: u64, pub(super) reconnect_backoff_cap_ms: u64,
pub(super) reconnect_fast_retry_count: u32, pub(super) reconnect_fast_retry_count: u32,
pub(super) writer_pick_mode: &'static str,
pub(super) writer_pick_sample_size: u8,
pub(super) me2dc_fallback: bool, pub(super) me2dc_fallback: bool,
} }
@ -237,6 +239,8 @@ pub(super) fn build_limits_effective_data(cfg: &ProxyConfig) -> EffectiveLimitsD
reconnect_backoff_base_ms: cfg.general.me_reconnect_backoff_base_ms, reconnect_backoff_base_ms: cfg.general.me_reconnect_backoff_base_ms,
reconnect_backoff_cap_ms: cfg.general.me_reconnect_backoff_cap_ms, reconnect_backoff_cap_ms: cfg.general.me_reconnect_backoff_cap_ms,
reconnect_fast_retry_count: cfg.general.me_reconnect_fast_retry_count, reconnect_fast_retry_count: cfg.general.me_reconnect_fast_retry_count,
writer_pick_mode: me_writer_pick_mode_label(cfg.general.me_writer_pick_mode),
writer_pick_sample_size: cfg.general.me_writer_pick_sample_size,
me2dc_fallback: cfg.general.me2dc_fallback, me2dc_fallback: cfg.general.me2dc_fallback,
}, },
user_ip_policy: EffectiveUserIpPolicyLimits { user_ip_policy: EffectiveUserIpPolicyLimits {
@ -274,3 +278,10 @@ fn me_floor_mode_label(mode: MeFloorMode) -> &'static str {
MeFloorMode::Adaptive => "adaptive", MeFloorMode::Adaptive => "adaptive",
} }
} }
fn me_writer_pick_mode_label(mode: MeWriterPickMode) -> &'static str {
match mode {
MeWriterPickMode::SortedRr => "sorted_rr",
MeWriterPickMode::P2c => "p2c",
}
}

View File

@ -21,9 +21,10 @@ const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_ACTIVE_WRITERS_PER_CORE: u16 = 64;
const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_WARM_WRITERS_PER_CORE: u16 = 64; const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_WARM_WRITERS_PER_CORE: u16 = 64;
const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_ACTIVE_WRITERS_GLOBAL: u32 = 256; const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_ACTIVE_WRITERS_GLOBAL: u32 = 256;
const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_WARM_WRITERS_GLOBAL: u32 = 256; const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_WARM_WRITERS_GLOBAL: u32 = 256;
const DEFAULT_ME_WRITER_CMD_CHANNEL_CAPACITY: usize = 512; const DEFAULT_ME_WRITER_CMD_CHANNEL_CAPACITY: usize = 1024;
const DEFAULT_ME_ROUTE_CHANNEL_CAPACITY: usize = 512; const DEFAULT_ME_ROUTE_CHANNEL_CAPACITY: usize = 512;
const DEFAULT_ME_C2ME_CHANNEL_CAPACITY: usize = 128; const DEFAULT_ME_C2ME_CHANNEL_CAPACITY: usize = 256;
const DEFAULT_ME_WRITER_PICK_SAMPLE_SIZE: u8 = 3;
const DEFAULT_ME_HEALTH_INTERVAL_MS_UNHEALTHY: u64 = 1000; const DEFAULT_ME_HEALTH_INTERVAL_MS_UNHEALTHY: u64 = 1000;
const DEFAULT_ME_HEALTH_INTERVAL_MS_HEALTHY: u64 = 3000; const DEFAULT_ME_HEALTH_INTERVAL_MS_HEALTHY: u64 = 3000;
const DEFAULT_ME_ADMISSION_POLL_MS: u64 = 1000; const DEFAULT_ME_ADMISSION_POLL_MS: u64 = 1000;
@ -315,6 +316,10 @@ pub(crate) fn default_me_c2me_channel_capacity() -> usize {
DEFAULT_ME_C2ME_CHANNEL_CAPACITY DEFAULT_ME_C2ME_CHANNEL_CAPACITY
} }
pub(crate) fn default_me_writer_pick_sample_size() -> u8 {
DEFAULT_ME_WRITER_PICK_SAMPLE_SIZE
}
pub(crate) fn default_me_health_interval_ms_unhealthy() -> u64 { pub(crate) fn default_me_health_interval_ms_unhealthy() -> u64 {
DEFAULT_ME_HEALTH_INTERVAL_MS_UNHEALTHY DEFAULT_ME_HEALTH_INTERVAL_MS_UNHEALTHY
} }

View File

@ -29,7 +29,10 @@ use notify::{EventKind, RecursiveMode, Watcher, recommended_watcher};
use tokio::sync::{mpsc, watch}; use tokio::sync::{mpsc, watch};
use tracing::{error, info, warn}; use tracing::{error, info, warn};
use crate::config::{LogLevel, MeBindStaleMode, MeFloorMode, MeSocksKdfPolicy, MeTelemetryLevel}; use crate::config::{
LogLevel, MeBindStaleMode, MeFloorMode, MeSocksKdfPolicy, MeTelemetryLevel,
MeWriterPickMode,
};
use super::load::ProxyConfig; use super::load::ProxyConfig;
// ── Hot fields ──────────────────────────────────────────────────────────────── // ── Hot fields ────────────────────────────────────────────────────────────────
@ -57,6 +60,8 @@ pub struct HotFields {
pub me_bind_stale_ttl_secs: u64, pub me_bind_stale_ttl_secs: u64,
pub me_secret_atomic_snapshot: bool, pub me_secret_atomic_snapshot: bool,
pub me_deterministic_writer_sort: bool, pub me_deterministic_writer_sort: bool,
pub me_writer_pick_mode: MeWriterPickMode,
pub me_writer_pick_sample_size: u8,
pub me_single_endpoint_shadow_writers: u8, pub me_single_endpoint_shadow_writers: u8,
pub me_single_endpoint_outage_mode_enabled: bool, pub me_single_endpoint_outage_mode_enabled: bool,
pub me_single_endpoint_outage_disable_quarantine: bool, pub me_single_endpoint_outage_disable_quarantine: bool,
@ -130,6 +135,8 @@ impl HotFields {
me_bind_stale_ttl_secs: cfg.general.me_bind_stale_ttl_secs, me_bind_stale_ttl_secs: cfg.general.me_bind_stale_ttl_secs,
me_secret_atomic_snapshot: cfg.general.me_secret_atomic_snapshot, me_secret_atomic_snapshot: cfg.general.me_secret_atomic_snapshot,
me_deterministic_writer_sort: cfg.general.me_deterministic_writer_sort, me_deterministic_writer_sort: cfg.general.me_deterministic_writer_sort,
me_writer_pick_mode: cfg.general.me_writer_pick_mode,
me_writer_pick_sample_size: cfg.general.me_writer_pick_sample_size,
me_single_endpoint_shadow_writers: cfg.general.me_single_endpoint_shadow_writers, me_single_endpoint_shadow_writers: cfg.general.me_single_endpoint_shadow_writers,
me_single_endpoint_outage_mode_enabled: cfg me_single_endpoint_outage_mode_enabled: cfg
.general .general
@ -292,6 +299,8 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig {
cfg.general.me_bind_stale_ttl_secs = new.general.me_bind_stale_ttl_secs; cfg.general.me_bind_stale_ttl_secs = new.general.me_bind_stale_ttl_secs;
cfg.general.me_secret_atomic_snapshot = new.general.me_secret_atomic_snapshot; cfg.general.me_secret_atomic_snapshot = new.general.me_secret_atomic_snapshot;
cfg.general.me_deterministic_writer_sort = new.general.me_deterministic_writer_sort; cfg.general.me_deterministic_writer_sort = new.general.me_deterministic_writer_sort;
cfg.general.me_writer_pick_mode = new.general.me_writer_pick_mode;
cfg.general.me_writer_pick_sample_size = new.general.me_writer_pick_sample_size;
cfg.general.me_single_endpoint_shadow_writers = new.general.me_single_endpoint_shadow_writers; cfg.general.me_single_endpoint_shadow_writers = new.general.me_single_endpoint_shadow_writers;
cfg.general.me_single_endpoint_outage_mode_enabled = cfg.general.me_single_endpoint_outage_mode_enabled =
new.general.me_single_endpoint_outage_mode_enabled; new.general.me_single_endpoint_outage_mode_enabled;
@ -683,11 +692,15 @@ fn log_changes(
} }
if old_hot.me_secret_atomic_snapshot != new_hot.me_secret_atomic_snapshot if old_hot.me_secret_atomic_snapshot != new_hot.me_secret_atomic_snapshot
|| old_hot.me_deterministic_writer_sort != new_hot.me_deterministic_writer_sort || old_hot.me_deterministic_writer_sort != new_hot.me_deterministic_writer_sort
|| old_hot.me_writer_pick_mode != new_hot.me_writer_pick_mode
|| old_hot.me_writer_pick_sample_size != new_hot.me_writer_pick_sample_size
{ {
info!( info!(
"config reload: me_runtime_flags: secret_atomic_snapshot={} deterministic_sort={}", "config reload: me_runtime_flags: secret_atomic_snapshot={} deterministic_sort={} writer_pick_mode={:?} writer_pick_sample_size={}",
new_hot.me_secret_atomic_snapshot, new_hot.me_secret_atomic_snapshot,
new_hot.me_deterministic_writer_sort new_hot.me_deterministic_writer_sort,
new_hot.me_writer_pick_mode,
new_hot.me_writer_pick_sample_size,
); );
} }
if old_hot.me_single_endpoint_shadow_writers != new_hot.me_single_endpoint_shadow_writers if old_hot.me_single_endpoint_shadow_writers != new_hot.me_single_endpoint_shadow_writers

View File

@ -519,6 +519,12 @@ impl ProxyConfig {
)); ));
} }
if !(2..=4).contains(&config.general.me_writer_pick_sample_size) {
return Err(ProxyError::Config(
"general.me_writer_pick_sample_size must be within [2, 4]".to_string(),
));
}
if config.general.me_route_inline_recovery_attempts == 0 { if config.general.me_route_inline_recovery_attempts == 0 {
return Err(ProxyError::Config( return Err(ProxyError::Config(
"general.me_route_inline_recovery_attempts must be > 0".to_string(), "general.me_route_inline_recovery_attempts must be > 0".to_string(),

View File

@ -212,6 +212,32 @@ impl MeRouteNoWriterMode {
} }
} }
/// Middle-End writer selection mode for new client bindings.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum MeWriterPickMode {
SortedRr,
#[default]
P2c,
}
impl MeWriterPickMode {
pub fn as_u8(self) -> u8 {
match self {
MeWriterPickMode::SortedRr => 0,
MeWriterPickMode::P2c => 1,
}
}
pub fn from_u8(raw: u8) -> Self {
match raw {
0 => MeWriterPickMode::SortedRr,
1 => MeWriterPickMode::P2c,
_ => MeWriterPickMode::P2c,
}
}
}
/// Per-user unique source IP limit mode. /// Per-user unique source IP limit mode.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
@ -782,6 +808,14 @@ pub struct GeneralConfig {
#[serde(default = "default_me_deterministic_writer_sort")] #[serde(default = "default_me_deterministic_writer_sort")]
pub me_deterministic_writer_sort: bool, pub me_deterministic_writer_sort: bool,
/// Writer selection mode for ME route bind path.
#[serde(default)]
pub me_writer_pick_mode: MeWriterPickMode,
/// Number of candidates sampled by writer picker in `p2c` mode.
#[serde(default = "default_me_writer_pick_sample_size")]
pub me_writer_pick_sample_size: u8,
/// Enable NTP drift check at startup. /// Enable NTP drift check at startup.
#[serde(default = "default_ntp_check")] #[serde(default = "default_ntp_check")]
pub ntp_check: bool, pub ntp_check: bool,
@ -912,6 +946,8 @@ impl Default for GeneralConfig {
me_reinit_trigger_channel: default_me_reinit_trigger_channel(), me_reinit_trigger_channel: default_me_reinit_trigger_channel(),
me_reinit_coalesce_window_ms: default_me_reinit_coalesce_window_ms(), me_reinit_coalesce_window_ms: default_me_reinit_coalesce_window_ms(),
me_deterministic_writer_sort: default_me_deterministic_writer_sort(), me_deterministic_writer_sort: default_me_deterministic_writer_sort(),
me_writer_pick_mode: MeWriterPickMode::default(),
me_writer_pick_sample_size: default_me_writer_pick_sample_size(),
ntp_check: default_ntp_check(), ntp_check: default_ntp_check(),
ntp_servers: default_ntp_servers(), ntp_servers: default_ntp_servers(),
auto_degradation_enabled: default_true(), auto_degradation_enabled: default_true(),

View File

@ -1047,6 +1047,8 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
config.general.me_bind_stale_ttl_secs, config.general.me_bind_stale_ttl_secs,
config.general.me_secret_atomic_snapshot, config.general.me_secret_atomic_snapshot,
config.general.me_deterministic_writer_sort, config.general.me_deterministic_writer_sort,
config.general.me_writer_pick_mode,
config.general.me_writer_pick_sample_size,
config.general.me_socks_kdf_policy, config.general.me_socks_kdf_policy,
config.general.me_writer_cmd_channel_capacity, config.general.me_writer_cmd_channel_capacity,
config.general.me_route_channel_capacity, config.general.me_route_channel_capacity,

View File

@ -689,6 +689,135 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
} }
); );
let _ = writeln!(
out,
"# HELP telemt_me_writer_pick_total ME writer-pick outcomes by mode and result"
);
let _ = writeln!(out, "# TYPE telemt_me_writer_pick_total counter");
let _ = writeln!(
out,
"telemt_me_writer_pick_total{{mode=\"sorted_rr\",result=\"success_try\"}} {}",
if me_allows_normal {
stats.get_me_writer_pick_sorted_rr_success_try_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_writer_pick_total{{mode=\"sorted_rr\",result=\"success_fallback\"}} {}",
if me_allows_normal {
stats.get_me_writer_pick_sorted_rr_success_fallback_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_writer_pick_total{{mode=\"sorted_rr\",result=\"full\"}} {}",
if me_allows_normal {
stats.get_me_writer_pick_sorted_rr_full_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_writer_pick_total{{mode=\"sorted_rr\",result=\"closed\"}} {}",
if me_allows_normal {
stats.get_me_writer_pick_sorted_rr_closed_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_writer_pick_total{{mode=\"sorted_rr\",result=\"no_candidate\"}} {}",
if me_allows_normal {
stats.get_me_writer_pick_sorted_rr_no_candidate_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_writer_pick_total{{mode=\"p2c\",result=\"success_try\"}} {}",
if me_allows_normal {
stats.get_me_writer_pick_p2c_success_try_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_writer_pick_total{{mode=\"p2c\",result=\"success_fallback\"}} {}",
if me_allows_normal {
stats.get_me_writer_pick_p2c_success_fallback_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_writer_pick_total{{mode=\"p2c\",result=\"full\"}} {}",
if me_allows_normal {
stats.get_me_writer_pick_p2c_full_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_writer_pick_total{{mode=\"p2c\",result=\"closed\"}} {}",
if me_allows_normal {
stats.get_me_writer_pick_p2c_closed_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_writer_pick_total{{mode=\"p2c\",result=\"no_candidate\"}} {}",
if me_allows_normal {
stats.get_me_writer_pick_p2c_no_candidate_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_writer_pick_blocking_fallback_total ME writer-pick blocking fallback attempts"
);
let _ = writeln!(
out,
"# TYPE telemt_me_writer_pick_blocking_fallback_total counter"
);
let _ = writeln!(
out,
"telemt_me_writer_pick_blocking_fallback_total {}",
if me_allows_normal {
stats.get_me_writer_pick_blocking_fallback_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_writer_pick_mode_switch_total Writer-pick mode switches via runtime updates"
);
let _ = writeln!(out, "# TYPE telemt_me_writer_pick_mode_switch_total counter");
let _ = writeln!(
out,
"telemt_me_writer_pick_mode_switch_total {}",
if me_allows_normal {
stats.get_me_writer_pick_mode_switch_total()
} else {
0
}
);
let _ = writeln!( let _ = writeln!(
out, out,
"# HELP telemt_me_socks_kdf_policy_total SOCKS KDF policy outcomes" "# HELP telemt_me_socks_kdf_policy_total SOCKS KDF policy outcomes"

View File

@ -16,7 +16,7 @@ use std::collections::hash_map::DefaultHasher;
use std::collections::VecDeque; use std::collections::VecDeque;
use tracing::debug; use tracing::debug;
use crate::config::MeTelemetryLevel; use crate::config::{MeTelemetryLevel, MeWriterPickMode};
use self::telemetry::TelemetryPolicy; use self::telemetry::TelemetryPolicy;
// ============= Stats ============= // ============= Stats =============
@ -95,6 +95,18 @@ pub struct Stats {
me_route_drop_queue_full: AtomicU64, me_route_drop_queue_full: AtomicU64,
me_route_drop_queue_full_base: AtomicU64, me_route_drop_queue_full_base: AtomicU64,
me_route_drop_queue_full_high: AtomicU64, me_route_drop_queue_full_high: AtomicU64,
me_writer_pick_sorted_rr_success_try_total: AtomicU64,
me_writer_pick_sorted_rr_success_fallback_total: AtomicU64,
me_writer_pick_sorted_rr_full_total: AtomicU64,
me_writer_pick_sorted_rr_closed_total: AtomicU64,
me_writer_pick_sorted_rr_no_candidate_total: AtomicU64,
me_writer_pick_p2c_success_try_total: AtomicU64,
me_writer_pick_p2c_success_fallback_total: AtomicU64,
me_writer_pick_p2c_full_total: AtomicU64,
me_writer_pick_p2c_closed_total: AtomicU64,
me_writer_pick_p2c_no_candidate_total: AtomicU64,
me_writer_pick_blocking_fallback_total: AtomicU64,
me_writer_pick_mode_switch_total: AtomicU64,
me_socks_kdf_strict_reject: AtomicU64, me_socks_kdf_strict_reject: AtomicU64,
me_socks_kdf_compat_fallback: AtomicU64, me_socks_kdf_compat_fallback: AtomicU64,
secure_padding_invalid: AtomicU64, secure_padding_invalid: AtomicU64,
@ -497,6 +509,93 @@ impl Stats {
self.me_route_drop_queue_full_high.fetch_add(1, Ordering::Relaxed); self.me_route_drop_queue_full_high.fetch_add(1, Ordering::Relaxed);
} }
} }
pub fn increment_me_writer_pick_success_try_total(&self, mode: MeWriterPickMode) {
if !self.telemetry_me_allows_normal() {
return;
}
match mode {
MeWriterPickMode::SortedRr => {
self.me_writer_pick_sorted_rr_success_try_total
.fetch_add(1, Ordering::Relaxed);
}
MeWriterPickMode::P2c => {
self.me_writer_pick_p2c_success_try_total
.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn increment_me_writer_pick_success_fallback_total(&self, mode: MeWriterPickMode) {
if !self.telemetry_me_allows_normal() {
return;
}
match mode {
MeWriterPickMode::SortedRr => {
self.me_writer_pick_sorted_rr_success_fallback_total
.fetch_add(1, Ordering::Relaxed);
}
MeWriterPickMode::P2c => {
self.me_writer_pick_p2c_success_fallback_total
.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn increment_me_writer_pick_full_total(&self, mode: MeWriterPickMode) {
if !self.telemetry_me_allows_normal() {
return;
}
match mode {
MeWriterPickMode::SortedRr => {
self.me_writer_pick_sorted_rr_full_total
.fetch_add(1, Ordering::Relaxed);
}
MeWriterPickMode::P2c => {
self.me_writer_pick_p2c_full_total
.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn increment_me_writer_pick_closed_total(&self, mode: MeWriterPickMode) {
if !self.telemetry_me_allows_normal() {
return;
}
match mode {
MeWriterPickMode::SortedRr => {
self.me_writer_pick_sorted_rr_closed_total
.fetch_add(1, Ordering::Relaxed);
}
MeWriterPickMode::P2c => {
self.me_writer_pick_p2c_closed_total
.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn increment_me_writer_pick_no_candidate_total(&self, mode: MeWriterPickMode) {
if !self.telemetry_me_allows_normal() {
return;
}
match mode {
MeWriterPickMode::SortedRr => {
self.me_writer_pick_sorted_rr_no_candidate_total
.fetch_add(1, Ordering::Relaxed);
}
MeWriterPickMode::P2c => {
self.me_writer_pick_p2c_no_candidate_total
.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn increment_me_writer_pick_blocking_fallback_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_writer_pick_blocking_fallback_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_writer_pick_mode_switch_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_writer_pick_mode_switch_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_socks_kdf_strict_reject(&self) { pub fn increment_me_socks_kdf_strict_reject(&self) {
if self.telemetry_me_allows_normal() { if self.telemetry_me_allows_normal() {
self.me_socks_kdf_strict_reject.fetch_add(1, Ordering::Relaxed); self.me_socks_kdf_strict_reject.fetch_add(1, Ordering::Relaxed);
@ -1001,6 +1100,52 @@ impl Stats {
pub fn get_me_route_drop_queue_full_high(&self) -> u64 { pub fn get_me_route_drop_queue_full_high(&self) -> u64 {
self.me_route_drop_queue_full_high.load(Ordering::Relaxed) self.me_route_drop_queue_full_high.load(Ordering::Relaxed)
} }
pub fn get_me_writer_pick_sorted_rr_success_try_total(&self) -> u64 {
self.me_writer_pick_sorted_rr_success_try_total
.load(Ordering::Relaxed)
}
pub fn get_me_writer_pick_sorted_rr_success_fallback_total(&self) -> u64 {
self.me_writer_pick_sorted_rr_success_fallback_total
.load(Ordering::Relaxed)
}
pub fn get_me_writer_pick_sorted_rr_full_total(&self) -> u64 {
self.me_writer_pick_sorted_rr_full_total
.load(Ordering::Relaxed)
}
pub fn get_me_writer_pick_sorted_rr_closed_total(&self) -> u64 {
self.me_writer_pick_sorted_rr_closed_total
.load(Ordering::Relaxed)
}
pub fn get_me_writer_pick_sorted_rr_no_candidate_total(&self) -> u64 {
self.me_writer_pick_sorted_rr_no_candidate_total
.load(Ordering::Relaxed)
}
pub fn get_me_writer_pick_p2c_success_try_total(&self) -> u64 {
self.me_writer_pick_p2c_success_try_total
.load(Ordering::Relaxed)
}
pub fn get_me_writer_pick_p2c_success_fallback_total(&self) -> u64 {
self.me_writer_pick_p2c_success_fallback_total
.load(Ordering::Relaxed)
}
pub fn get_me_writer_pick_p2c_full_total(&self) -> u64 {
self.me_writer_pick_p2c_full_total.load(Ordering::Relaxed)
}
pub fn get_me_writer_pick_p2c_closed_total(&self) -> u64 {
self.me_writer_pick_p2c_closed_total.load(Ordering::Relaxed)
}
pub fn get_me_writer_pick_p2c_no_candidate_total(&self) -> u64 {
self.me_writer_pick_p2c_no_candidate_total
.load(Ordering::Relaxed)
}
pub fn get_me_writer_pick_blocking_fallback_total(&self) -> u64 {
self.me_writer_pick_blocking_fallback_total
.load(Ordering::Relaxed)
}
pub fn get_me_writer_pick_mode_switch_total(&self) -> u64 {
self.me_writer_pick_mode_switch_total
.load(Ordering::Relaxed)
}
pub fn get_me_socks_kdf_strict_reject(&self) -> u64 { pub fn get_me_socks_kdf_strict_reject(&self) -> u64 {
self.me_socks_kdf_strict_reject.load(Ordering::Relaxed) self.me_socks_kdf_strict_reject.load(Ordering::Relaxed)
} }

View File

@ -306,6 +306,8 @@ async fn run_update_cycle(
cfg.general.me_bind_stale_ttl_secs, cfg.general.me_bind_stale_ttl_secs,
cfg.general.me_secret_atomic_snapshot, cfg.general.me_secret_atomic_snapshot,
cfg.general.me_deterministic_writer_sort, cfg.general.me_deterministic_writer_sort,
cfg.general.me_writer_pick_mode,
cfg.general.me_writer_pick_sample_size,
cfg.general.me_single_endpoint_shadow_writers, cfg.general.me_single_endpoint_shadow_writers,
cfg.general.me_single_endpoint_outage_mode_enabled, cfg.general.me_single_endpoint_outage_mode_enabled,
cfg.general.me_single_endpoint_outage_disable_quarantine, cfg.general.me_single_endpoint_outage_disable_quarantine,
@ -530,6 +532,8 @@ pub async fn me_config_updater(
cfg.general.me_bind_stale_ttl_secs, cfg.general.me_bind_stale_ttl_secs,
cfg.general.me_secret_atomic_snapshot, cfg.general.me_secret_atomic_snapshot,
cfg.general.me_deterministic_writer_sort, cfg.general.me_deterministic_writer_sort,
cfg.general.me_writer_pick_mode,
cfg.general.me_writer_pick_sample_size,
cfg.general.me_single_endpoint_shadow_writers, cfg.general.me_single_endpoint_shadow_writers,
cfg.general.me_single_endpoint_outage_mode_enabled, cfg.general.me_single_endpoint_outage_mode_enabled,
cfg.general.me_single_endpoint_outage_disable_quarantine, cfg.general.me_single_endpoint_outage_disable_quarantine,

View File

@ -7,7 +7,9 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::sync::{Mutex, Notify, RwLock, mpsc}; use tokio::sync::{Mutex, Notify, RwLock, mpsc};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use crate::config::{MeBindStaleMode, MeFloorMode, MeRouteNoWriterMode, MeSocksKdfPolicy}; use crate::config::{
MeBindStaleMode, MeFloorMode, MeRouteNoWriterMode, MeSocksKdfPolicy, MeWriterPickMode,
};
use crate::crypto::SecureRandom; use crate::crypto::SecureRandom;
use crate::network::IpFamily; use crate::network::IpFamily;
use crate::network::probe::NetworkDecision; use crate::network::probe::NetworkDecision;
@ -39,6 +41,7 @@ pub struct MeWriter {
pub tx: mpsc::Sender<WriterCommand>, pub tx: mpsc::Sender<WriterCommand>,
pub cancel: CancellationToken, pub cancel: CancellationToken,
pub degraded: Arc<AtomicBool>, pub degraded: Arc<AtomicBool>,
pub rtt_ema_ms_x10: Arc<AtomicU32>,
pub draining: Arc<AtomicBool>, pub draining: Arc<AtomicBool>,
pub draining_started_at_epoch_secs: Arc<AtomicU64>, pub draining_started_at_epoch_secs: Arc<AtomicU64>,
pub drain_deadline_epoch_secs: Arc<AtomicU64>, pub drain_deadline_epoch_secs: Arc<AtomicU64>,
@ -177,6 +180,8 @@ pub struct MePool {
pub(super) me_bind_stale_ttl_secs: AtomicU64, pub(super) me_bind_stale_ttl_secs: AtomicU64,
pub(super) secret_atomic_snapshot: AtomicBool, pub(super) secret_atomic_snapshot: AtomicBool,
pub(super) me_deterministic_writer_sort: AtomicBool, pub(super) me_deterministic_writer_sort: AtomicBool,
pub(super) me_writer_pick_mode: AtomicU8,
pub(super) me_writer_pick_sample_size: AtomicU8,
pub(super) me_socks_kdf_policy: AtomicU8, pub(super) me_socks_kdf_policy: AtomicU8,
pub(super) me_route_no_writer_mode: AtomicU8, pub(super) me_route_no_writer_mode: AtomicU8,
pub(super) me_route_no_writer_wait: Duration, pub(super) me_route_no_writer_wait: Duration,
@ -274,6 +279,8 @@ impl MePool {
me_bind_stale_ttl_secs: u64, me_bind_stale_ttl_secs: u64,
me_secret_atomic_snapshot: bool, me_secret_atomic_snapshot: bool,
me_deterministic_writer_sort: bool, me_deterministic_writer_sort: bool,
me_writer_pick_mode: MeWriterPickMode,
me_writer_pick_sample_size: u8,
me_socks_kdf_policy: MeSocksKdfPolicy, me_socks_kdf_policy: MeSocksKdfPolicy,
me_writer_cmd_channel_capacity: usize, me_writer_cmd_channel_capacity: usize,
me_route_channel_capacity: usize, me_route_channel_capacity: usize,
@ -450,6 +457,8 @@ impl MePool {
me_bind_stale_ttl_secs: AtomicU64::new(me_bind_stale_ttl_secs), me_bind_stale_ttl_secs: AtomicU64::new(me_bind_stale_ttl_secs),
secret_atomic_snapshot: AtomicBool::new(me_secret_atomic_snapshot), secret_atomic_snapshot: AtomicBool::new(me_secret_atomic_snapshot),
me_deterministic_writer_sort: AtomicBool::new(me_deterministic_writer_sort), me_deterministic_writer_sort: AtomicBool::new(me_deterministic_writer_sort),
me_writer_pick_mode: AtomicU8::new(me_writer_pick_mode.as_u8()),
me_writer_pick_sample_size: AtomicU8::new(me_writer_pick_sample_size.clamp(2, 4)),
me_socks_kdf_policy: AtomicU8::new(me_socks_kdf_policy.as_u8()), me_socks_kdf_policy: AtomicU8::new(me_socks_kdf_policy.as_u8()),
me_route_no_writer_mode: AtomicU8::new(me_route_no_writer_mode.as_u8()), me_route_no_writer_mode: AtomicU8::new(me_route_no_writer_mode.as_u8()),
me_route_no_writer_wait: Duration::from_millis(me_route_no_writer_wait_ms), me_route_no_writer_wait: Duration::from_millis(me_route_no_writer_wait_ms),
@ -489,6 +498,8 @@ impl MePool {
bind_stale_ttl_secs: u64, bind_stale_ttl_secs: u64,
secret_atomic_snapshot: bool, secret_atomic_snapshot: bool,
deterministic_writer_sort: bool, deterministic_writer_sort: bool,
writer_pick_mode: MeWriterPickMode,
writer_pick_sample_size: u8,
single_endpoint_shadow_writers: u8, single_endpoint_shadow_writers: u8,
single_endpoint_outage_mode_enabled: bool, single_endpoint_outage_mode_enabled: bool,
single_endpoint_outage_disable_quarantine: bool, single_endpoint_outage_disable_quarantine: bool,
@ -535,6 +546,14 @@ impl MePool {
.store(secret_atomic_snapshot, Ordering::Relaxed); .store(secret_atomic_snapshot, Ordering::Relaxed);
self.me_deterministic_writer_sort self.me_deterministic_writer_sort
.store(deterministic_writer_sort, Ordering::Relaxed); .store(deterministic_writer_sort, Ordering::Relaxed);
let previous_writer_pick_mode = self.writer_pick_mode();
self.me_writer_pick_mode
.store(writer_pick_mode.as_u8(), Ordering::Relaxed);
self.me_writer_pick_sample_size
.store(writer_pick_sample_size.clamp(2, 4), Ordering::Relaxed);
if previous_writer_pick_mode != writer_pick_mode {
self.stats.increment_me_writer_pick_mode_switch_total();
}
self.me_single_endpoint_shadow_writers self.me_single_endpoint_shadow_writers
.store(single_endpoint_shadow_writers, Ordering::Relaxed); .store(single_endpoint_shadow_writers, Ordering::Relaxed);
self.me_single_endpoint_outage_mode_enabled self.me_single_endpoint_outage_mode_enabled
@ -692,6 +711,16 @@ impl MePool {
MeBindStaleMode::from_u8(self.me_bind_stale_mode.load(Ordering::Relaxed)) MeBindStaleMode::from_u8(self.me_bind_stale_mode.load(Ordering::Relaxed))
} }
pub(super) fn writer_pick_mode(&self) -> MeWriterPickMode {
MeWriterPickMode::from_u8(self.me_writer_pick_mode.load(Ordering::Relaxed))
}
pub(super) fn writer_pick_sample_size(&self) -> usize {
self.me_writer_pick_sample_size
.load(Ordering::Relaxed)
.clamp(2, 4) as usize
}
pub(super) fn required_writers_for_dc(&self, endpoint_count: usize) -> usize { pub(super) fn required_writers_for_dc(&self, endpoint_count: usize) -> usize {
if endpoint_count == 0 { if endpoint_count == 0 {
return 0; return 0;

View File

@ -1,4 +1,4 @@
use std::collections::HashSet; use std::collections::{HashMap, HashSet};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
@ -113,10 +113,35 @@ impl MePool {
contour: WriterContour, contour: WriterContour,
allow_coverage_override: bool, allow_coverage_override: bool,
) -> bool { ) -> bool {
let candidates = self.connectable_endpoints(endpoints).await; let mut candidates = self.connectable_endpoints(endpoints).await;
if candidates.is_empty() { if candidates.is_empty() {
return false; return false;
} }
if candidates.len() > 1 {
let mut active_by_endpoint = HashMap::<SocketAddr, usize>::new();
let ws = self.writers.read().await;
for writer in ws.iter() {
if writer.draining.load(Ordering::Relaxed) {
continue;
}
if writer.writer_dc != dc {
continue;
}
if !matches!(
super::pool::WriterContour::from_u8(
writer.contour.load(Ordering::Relaxed),
),
super::pool::WriterContour::Active
) {
continue;
}
if candidates.contains(&writer.addr) {
*active_by_endpoint.entry(writer.addr).or_insert(0) += 1;
}
}
drop(ws);
candidates.sort_by_key(|addr| (active_by_endpoint.get(addr).copied().unwrap_or(0), *addr));
}
let start = (self.rr.fetch_add(1, Ordering::Relaxed) as usize) % candidates.len(); let start = (self.rr.fetch_add(1, Ordering::Relaxed) as usize) % candidates.len();
for offset in 0..candidates.len() { for offset in 0..candidates.len() {
let idx = (start + offset) % candidates.len(); let idx = (start + offset) % candidates.len();

View File

@ -25,6 +25,7 @@ pub(crate) struct MeApiWriterStatusSnapshot {
pub(crate) struct MeApiDcStatusSnapshot { pub(crate) struct MeApiDcStatusSnapshot {
pub dc: i16, pub dc: i16,
pub endpoints: Vec<SocketAddr>, pub endpoints: Vec<SocketAddr>,
pub endpoint_writers: Vec<MeApiDcEndpointWriterSnapshot>,
pub available_endpoints: usize, pub available_endpoints: usize,
pub available_pct: f64, pub available_pct: f64,
pub required_writers: usize, pub required_writers: usize,
@ -38,6 +39,12 @@ pub(crate) struct MeApiDcStatusSnapshot {
pub load: usize, pub load: usize,
} }
#[derive(Clone, Debug)]
pub(crate) struct MeApiDcEndpointWriterSnapshot {
pub endpoint: SocketAddr,
pub active_writers: usize,
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub(crate) struct MeApiStatusSnapshot { pub(crate) struct MeApiStatusSnapshot {
pub generated_at_epoch_secs: u64, pub generated_at_epoch_secs: u64,
@ -118,6 +125,8 @@ pub(crate) struct MeApiRuntimeSnapshot {
pub me_single_endpoint_outage_backoff_max_ms: u64, pub me_single_endpoint_outage_backoff_max_ms: u64,
pub me_single_endpoint_shadow_rotate_every_secs: u64, pub me_single_endpoint_shadow_rotate_every_secs: u64,
pub me_deterministic_writer_sort: bool, pub me_deterministic_writer_sort: bool,
pub me_writer_pick_mode: &'static str,
pub me_writer_pick_sample_size: u8,
pub me_socks_kdf_policy: &'static str, pub me_socks_kdf_policy: &'static str,
pub quarantined_endpoints: Vec<MeApiQuarantinedEndpointSnapshot>, pub quarantined_endpoints: Vec<MeApiQuarantinedEndpointSnapshot>,
pub network_path: Vec<MeApiDcPathSnapshot>, pub network_path: Vec<MeApiDcPathSnapshot>,
@ -338,6 +347,16 @@ impl MePool {
dcs.push(MeApiDcStatusSnapshot { dcs.push(MeApiDcStatusSnapshot {
dc, dc,
endpoint_writers: endpoints
.iter()
.map(|endpoint| MeApiDcEndpointWriterSnapshot {
endpoint: *endpoint,
active_writers: live_writers_by_dc_endpoint
.get(&(dc, *endpoint))
.copied()
.unwrap_or(0),
})
.collect(),
endpoints: endpoints.into_iter().collect(), endpoints: endpoints.into_iter().collect(),
available_endpoints: dc_available_endpoints, available_endpoints: dc_available_endpoints,
available_pct: ratio_pct(dc_available_endpoints, endpoint_count), available_pct: ratio_pct(dc_available_endpoints, endpoint_count),
@ -522,6 +541,8 @@ impl MePool {
me_deterministic_writer_sort: self me_deterministic_writer_sort: self
.me_deterministic_writer_sort .me_deterministic_writer_sort
.load(Ordering::Relaxed), .load(Ordering::Relaxed),
me_writer_pick_mode: writer_pick_mode_label(self.writer_pick_mode()),
me_writer_pick_sample_size: self.writer_pick_sample_size() as u8,
me_socks_kdf_policy: socks_kdf_policy_label(self.socks_kdf_policy()), me_socks_kdf_policy: socks_kdf_policy_label(self.socks_kdf_policy()),
quarantined_endpoints, quarantined_endpoints,
network_path, network_path,
@ -570,6 +591,13 @@ fn bind_stale_mode_label(mode: MeBindStaleMode) -> &'static str {
} }
} }
fn writer_pick_mode_label(mode: crate::config::MeWriterPickMode) -> &'static str {
match mode {
crate::config::MeWriterPickMode::SortedRr => "sorted_rr",
crate::config::MeWriterPickMode::P2c => "p2c",
}
}
fn socks_kdf_policy_label(policy: MeSocksKdfPolicy) -> &'static str { fn socks_kdf_policy_label(policy: MeSocksKdfPolicy) -> &'static str {
match policy { match policy {
MeSocksKdfPolicy::Strict => "strict", MeSocksKdfPolicy::Strict => "strict",

View File

@ -1,6 +1,6 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering}; use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, Ordering};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use std::io::ErrorKind; use std::io::ErrorKind;
@ -128,6 +128,7 @@ impl MePool {
let contour = Arc::new(AtomicU8::new(contour.as_u8())); let contour = Arc::new(AtomicU8::new(contour.as_u8()));
let cancel = CancellationToken::new(); let cancel = CancellationToken::new();
let degraded = Arc::new(AtomicBool::new(false)); let degraded = Arc::new(AtomicBool::new(false));
let rtt_ema_ms_x10 = Arc::new(AtomicU32::new(0));
let draining = Arc::new(AtomicBool::new(false)); let draining = Arc::new(AtomicBool::new(false));
let draining_started_at_epoch_secs = Arc::new(AtomicU64::new(0)); let draining_started_at_epoch_secs = Arc::new(AtomicU64::new(0));
let drain_deadline_epoch_secs = Arc::new(AtomicU64::new(0)); let drain_deadline_epoch_secs = Arc::new(AtomicU64::new(0));
@ -169,6 +170,7 @@ impl MePool {
tx: tx.clone(), tx: tx.clone(),
cancel: cancel.clone(), cancel: cancel.clone(),
degraded: degraded.clone(), degraded: degraded.clone(),
rtt_ema_ms_x10: rtt_ema_ms_x10.clone(),
draining: draining.clone(), draining: draining.clone(),
draining_started_at_epoch_secs: draining_started_at_epoch_secs.clone(), draining_started_at_epoch_secs: draining_started_at_epoch_secs.clone(),
drain_deadline_epoch_secs: drain_deadline_epoch_secs.clone(), drain_deadline_epoch_secs: drain_deadline_epoch_secs.clone(),
@ -222,6 +224,7 @@ impl MePool {
stats_reader, stats_reader,
writer_id, writer_id,
degraded.clone(), degraded.clone(),
rtt_ema_ms_x10.clone(),
cancel_reader_token.clone(), cancel_reader_token.clone(),
) )
.await; .await;

View File

@ -1,7 +1,7 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::io::ErrorKind; use std::io::ErrorKind;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::time::Instant; use std::time::Instant;
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
@ -34,6 +34,7 @@ pub(crate) async fn reader_loop(
stats: Arc<Stats>, stats: Arc<Stats>,
_writer_id: u64, _writer_id: u64,
degraded: Arc<AtomicBool>, degraded: Arc<AtomicBool>,
writer_rtt_ema_ms_x10: Arc<AtomicU32>,
cancel: CancellationToken, cancel: CancellationToken,
) -> Result<()> { ) -> Result<()> {
let mut raw = enc_leftover; let mut raw = enc_leftover;
@ -208,6 +209,8 @@ pub(crate) async fn reader_loop(
} }
let degraded_now = entry.1 > entry.0 * 2.0; let degraded_now = entry.1 > entry.0 * 2.0;
degraded.store(degraded_now, Ordering::Relaxed); degraded.store(degraded_now, Ordering::Relaxed);
writer_rtt_ema_ms_x10
.store((entry.1 * 10.0).clamp(0.0, u32::MAX as f64) as u32, Ordering::Relaxed);
trace!(writer_id = wid, rtt_ms = rtt, ema_ms = entry.1, base_ms = entry.0, degraded = degraded_now, "ME RTT sample"); trace!(writer_id = wid, rtt_ms = rtt, ema_ms = entry.1, base_ms = entry.0, degraded = degraded_now, "ME RTT sample");
} }
} else { } else {

View File

@ -9,7 +9,7 @@ use bytes::Bytes;
use tokio::sync::mpsc::error::TrySendError; use tokio::sync::mpsc::error::TrySendError;
use tracing::{debug, warn}; use tracing::{debug, warn};
use crate::config::MeRouteNoWriterMode; use crate::config::{MeRouteNoWriterMode, MeWriterPickMode};
use crate::error::{ProxyError, Result}; use crate::error::{ProxyError, Result};
use crate::network::IpFamily; use crate::network::IpFamily;
use crate::protocol::constants::{RPC_CLOSE_CONN_U32, RPC_CLOSE_EXT_U32}; use crate::protocol::constants::{RPC_CLOSE_CONN_U32, RPC_CLOSE_EXT_U32};
@ -24,6 +24,10 @@ use super::registry::ConnMeta;
const IDLE_WRITER_PENALTY_MID_SECS: u64 = 45; const IDLE_WRITER_PENALTY_MID_SECS: u64 = 45;
const IDLE_WRITER_PENALTY_HIGH_SECS: u64 = 55; const IDLE_WRITER_PENALTY_HIGH_SECS: u64 = 55;
const HYBRID_GLOBAL_BURST_PERIOD_ROUNDS: u32 = 4; const HYBRID_GLOBAL_BURST_PERIOD_ROUNDS: u32 = 4;
const PICK_PENALTY_WARM: u64 = 200;
const PICK_PENALTY_DRAINING: u64 = 600;
const PICK_PENALTY_STALE: u64 = 300;
const PICK_PENALTY_DEGRADED: u64 = 250;
impl MePool { impl MePool {
/// Send RPC_PROXY_REQ. `tag_override`: per-user ad_tag (from access.user_ad_tags); if None, uses pool default. /// Send RPC_PROXY_REQ. `tag_override`: per-user ad_tag (from access.user_ad_tags); if None, uses pool default.
@ -181,6 +185,7 @@ impl MePool {
.await; .await;
} }
if candidate_indices.is_empty() { if candidate_indices.is_empty() {
let pick_mode = self.writer_pick_mode();
match no_writer_mode { match no_writer_mode {
MeRouteNoWriterMode::AsyncRecoveryFailfast => { MeRouteNoWriterMode::AsyncRecoveryFailfast => {
let deadline = *no_writer_deadline.get_or_insert_with(|| { let deadline = *no_writer_deadline.get_or_insert_with(|| {
@ -196,6 +201,7 @@ impl MePool {
if self.wait_for_candidate_until(routed_dc, deadline).await { if self.wait_for_candidate_until(routed_dc, deadline).await {
continue; continue;
} }
self.stats.increment_me_writer_pick_no_candidate_total(pick_mode);
self.stats.increment_me_no_writer_failfast_total(); self.stats.increment_me_no_writer_failfast_total();
return Err(ProxyError::Proxy( return Err(ProxyError::Proxy(
"No ME writers available for target DC in failfast window".into(), "No ME writers available for target DC in failfast window".into(),
@ -209,10 +215,12 @@ impl MePool {
if self.wait_for_candidate_until(routed_dc, deadline).await { if self.wait_for_candidate_until(routed_dc, deadline).await {
continue; continue;
} }
self.stats.increment_me_writer_pick_no_candidate_total(pick_mode);
self.stats.increment_me_no_writer_failfast_total(); self.stats.increment_me_no_writer_failfast_total();
return Err(ProxyError::Proxy("No ME writers available for target DC".into())); return Err(ProxyError::Proxy("No ME writers available for target DC".into()));
} }
if emergency_attempts >= self.me_route_inline_recovery_attempts.max(1) { if emergency_attempts >= self.me_route_inline_recovery_attempts.max(1) {
self.stats.increment_me_writer_pick_no_candidate_total(pick_mode);
self.stats.increment_me_no_writer_failfast_total(); self.stats.increment_me_no_writer_failfast_total();
return Err(ProxyError::Proxy("No ME writers available for target DC".into())); return Err(ProxyError::Proxy("No ME writers available for target DC".into()));
} }
@ -237,6 +245,7 @@ impl MePool {
.await; .await;
} }
if candidate_indices.is_empty() { if candidate_indices.is_empty() {
self.stats.increment_me_writer_pick_no_candidate_total(pick_mode);
return Err(ProxyError::Proxy("No ME writers available for target DC".into())); return Err(ProxyError::Proxy("No ME writers available for target DC".into()));
} }
} }
@ -259,6 +268,8 @@ impl MePool {
} }
} }
hybrid_wait_current = hybrid_wait_step; hybrid_wait_current = hybrid_wait_step;
let pick_mode = self.writer_pick_mode();
let pick_sample_size = self.writer_pick_sample_size();
let writer_ids: Vec<u64> = candidate_indices let writer_ids: Vec<u64> = candidate_indices
.iter() .iter()
.map(|idx| writers_snapshot[*idx].id) .map(|idx| writers_snapshot[*idx].id)
@ -268,69 +279,84 @@ impl MePool {
.writer_idle_since_for_writer_ids(&writer_ids) .writer_idle_since_for_writer_ids(&writer_ids)
.await; .await;
let now_epoch_secs = Self::now_epoch_secs(); let now_epoch_secs = Self::now_epoch_secs();
if self.me_deterministic_writer_sort.load(Ordering::Relaxed) {
candidate_indices.sort_by(|lhs, rhs| {
let left = &writers_snapshot[*lhs];
let right = &writers_snapshot[*rhs];
let left_key = (
self.writer_contour_rank_for_selection(left),
(left.generation < self.current_generation()) as usize,
left.degraded.load(Ordering::Relaxed) as usize,
self.writer_idle_rank_for_selection(
left,
&writer_idle_since,
now_epoch_secs,
),
Reverse(left.tx.capacity()),
left.addr,
left.id,
);
let right_key = (
self.writer_contour_rank_for_selection(right),
(right.generation < self.current_generation()) as usize,
right.degraded.load(Ordering::Relaxed) as usize,
self.writer_idle_rank_for_selection(
right,
&writer_idle_since,
now_epoch_secs,
),
Reverse(right.tx.capacity()),
right.addr,
right.id,
);
left_key.cmp(&right_key)
});
} else {
candidate_indices.sort_by_key(|idx| {
let w = &writers_snapshot[*idx];
let degraded = w.degraded.load(Ordering::Relaxed);
let stale = (w.generation < self.current_generation()) as usize;
(
self.writer_contour_rank_for_selection(w),
stale,
degraded as usize,
self.writer_idle_rank_for_selection(
w,
&writer_idle_since,
now_epoch_secs,
),
Reverse(w.tx.capacity()),
)
});
}
let start = self.rr.fetch_add(1, Ordering::Relaxed) as usize % candidate_indices.len(); let start = self.rr.fetch_add(1, Ordering::Relaxed) as usize % candidate_indices.len();
let ordered_candidate_indices = if pick_mode == MeWriterPickMode::P2c {
self.p2c_ordered_candidate_indices(
&candidate_indices,
&writers_snapshot,
&writer_idle_since,
now_epoch_secs,
start,
pick_sample_size,
)
} else {
if self.me_deterministic_writer_sort.load(Ordering::Relaxed) {
candidate_indices.sort_by(|lhs, rhs| {
let left = &writers_snapshot[*lhs];
let right = &writers_snapshot[*rhs];
let left_key = (
self.writer_contour_rank_for_selection(left),
(left.generation < self.current_generation()) as usize,
left.degraded.load(Ordering::Relaxed) as usize,
self.writer_idle_rank_for_selection(
left,
&writer_idle_since,
now_epoch_secs,
),
Reverse(left.tx.capacity()),
left.addr,
left.id,
);
let right_key = (
self.writer_contour_rank_for_selection(right),
(right.generation < self.current_generation()) as usize,
right.degraded.load(Ordering::Relaxed) as usize,
self.writer_idle_rank_for_selection(
right,
&writer_idle_since,
now_epoch_secs,
),
Reverse(right.tx.capacity()),
right.addr,
right.id,
);
left_key.cmp(&right_key)
});
} else {
candidate_indices.sort_by_key(|idx| {
let w = &writers_snapshot[*idx];
let degraded = w.degraded.load(Ordering::Relaxed);
let stale = (w.generation < self.current_generation()) as usize;
(
self.writer_contour_rank_for_selection(w),
stale,
degraded as usize,
self.writer_idle_rank_for_selection(
w,
&writer_idle_since,
now_epoch_secs,
),
Reverse(w.tx.capacity()),
)
});
}
let mut ordered = Vec::<usize>::with_capacity(candidate_indices.len());
for offset in 0..candidate_indices.len() {
ordered.push(candidate_indices[(start + offset) % candidate_indices.len()]);
}
ordered
};
let mut fallback_blocking_idx: Option<usize> = None; let mut fallback_blocking_idx: Option<usize> = None;
for offset in 0..candidate_indices.len() { for idx in ordered_candidate_indices {
let idx = candidate_indices[(start + offset) % candidate_indices.len()];
let w = &writers_snapshot[idx]; let w = &writers_snapshot[idx];
if !self.writer_accepts_new_binding(w) { if !self.writer_accepts_new_binding(w) {
continue; continue;
} }
match w.tx.try_send(WriterCommand::Data(payload.clone())) { match w.tx.try_send(WriterCommand::Data(payload.clone())) {
Ok(()) => { Ok(()) => {
self.stats.increment_me_writer_pick_success_try_total(pick_mode);
self.registry self.registry
.bind_writer(conn_id, w.id, w.tx.clone(), meta.clone()) .bind_writer(conn_id, w.id, w.tx.clone(), meta.clone())
.await; .await;
@ -352,6 +378,7 @@ impl MePool {
} }
} }
Err(TrySendError::Closed(_)) => { Err(TrySendError::Closed(_)) => {
self.stats.increment_me_writer_pick_closed_total(pick_mode);
warn!(writer_id = w.id, "ME writer channel closed"); warn!(writer_id = w.id, "ME writer channel closed");
self.remove_writer_and_close_clients(w.id).await; self.remove_writer_and_close_clients(w.id).await;
continue; continue;
@ -360,15 +387,20 @@ impl MePool {
} }
let Some(blocking_idx) = fallback_blocking_idx else { let Some(blocking_idx) = fallback_blocking_idx else {
self.stats.increment_me_writer_pick_full_total(pick_mode);
continue; continue;
}; };
let w = writers_snapshot[blocking_idx].clone(); let w = writers_snapshot[blocking_idx].clone();
if !self.writer_accepts_new_binding(&w) { if !self.writer_accepts_new_binding(&w) {
self.stats.increment_me_writer_pick_full_total(pick_mode);
continue; continue;
} }
self.stats.increment_me_writer_pick_blocking_fallback_total();
match w.tx.send(WriterCommand::Data(payload.clone())).await { match w.tx.send(WriterCommand::Data(payload.clone())).await {
Ok(()) => { Ok(()) => {
self.stats
.increment_me_writer_pick_success_fallback_total(pick_mode);
self.registry self.registry
.bind_writer(conn_id, w.id, w.tx.clone(), meta.clone()) .bind_writer(conn_id, w.id, w.tx.clone(), meta.clone())
.await; .await;
@ -378,6 +410,7 @@ impl MePool {
return Ok(()); return Ok(());
} }
Err(_) => { Err(_) => {
self.stats.increment_me_writer_pick_closed_total(pick_mode);
warn!(writer_id = w.id, "ME writer channel closed (blocking)"); warn!(writer_id = w.id, "ME writer channel closed (blocking)");
self.remove_writer_and_close_clients(w.id).await; self.remove_writer_and_close_clients(w.id).await;
} }
@ -626,4 +659,87 @@ impl MePool {
0 0
} }
} }
fn writer_pick_score(
&self,
writer: &super::pool::MeWriter,
idle_since_by_writer: &HashMap<u64, u64>,
now_epoch_secs: u64,
) -> u64 {
let contour_penalty = match WriterContour::from_u8(writer.contour.load(Ordering::Relaxed)) {
WriterContour::Active => 0,
WriterContour::Warm => PICK_PENALTY_WARM,
WriterContour::Draining => PICK_PENALTY_DRAINING,
};
let stale_penalty = if writer.generation < self.current_generation() {
PICK_PENALTY_STALE
} else {
0
};
let degraded_penalty = if writer.degraded.load(Ordering::Relaxed) {
PICK_PENALTY_DEGRADED
} else {
0
};
let idle_penalty =
(self.writer_idle_rank_for_selection(writer, idle_since_by_writer, now_epoch_secs) as u64)
* 100;
let queue_cap = self.writer_cmd_channel_capacity.max(1) as u64;
let queue_remaining = writer.tx.capacity() as u64;
let queue_used = queue_cap.saturating_sub(queue_remaining.min(queue_cap));
let queue_util_pct = queue_used.saturating_mul(100) / queue_cap;
let queue_penalty = queue_util_pct.saturating_mul(4);
let rtt_penalty = ((writer.rtt_ema_ms_x10.load(Ordering::Relaxed) as u64).saturating_add(5) / 10)
.min(400);
contour_penalty
.saturating_add(stale_penalty)
.saturating_add(degraded_penalty)
.saturating_add(idle_penalty)
.saturating_add(queue_penalty)
.saturating_add(rtt_penalty)
}
fn p2c_ordered_candidate_indices(
&self,
candidate_indices: &[usize],
writers_snapshot: &[super::pool::MeWriter],
idle_since_by_writer: &HashMap<u64, u64>,
now_epoch_secs: u64,
start: usize,
sample_size: usize,
) -> Vec<usize> {
let total = candidate_indices.len();
if total == 0 {
return Vec::new();
}
let mut sampled = Vec::<usize>::with_capacity(sample_size.min(total));
let mut seen = HashSet::<usize>::with_capacity(total);
for offset in 0..sample_size.min(total) {
let idx = candidate_indices[(start + offset) % total];
if seen.insert(idx) {
sampled.push(idx);
}
}
sampled.sort_by_key(|idx| {
let writer = &writers_snapshot[*idx];
(
self.writer_pick_score(writer, idle_since_by_writer, now_epoch_secs),
writer.addr,
writer.id,
)
});
let mut ordered = Vec::<usize>::with_capacity(total);
ordered.extend(sampled.iter().copied());
for offset in 0..total {
let idx = candidate_indices[(start + offset) % total];
if seen.insert(idx) {
ordered.push(idx);
}
}
ordered
}
} }