From fca0e3f619bdad5ac7f0a549118d1728bb58357a Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Sun, 8 Mar 2026 03:06:45 +0300 Subject: [PATCH] ME Writer Pick in Metrics+API --- src/api/model.rs | 9 +++ src/api/runtime_stats.rs | 18 +++-- src/api/runtime_zero.rs | 13 +++- src/metrics.rs | 129 ++++++++++++++++++++++++++++++++++ src/stats/mod.rs | 147 ++++++++++++++++++++++++++++++++++++++- 5 files changed, 310 insertions(+), 6 deletions(-) diff --git a/src/api/model.rs b/src/api/model.rs index fd678f6..0bc52de 100644 --- a/src/api/model.rs +++ b/src/api/model.rs @@ -266,6 +266,7 @@ pub(super) struct MeWritersData { pub(super) struct DcStatus { pub(super) dc: i16, pub(super) endpoints: Vec, + pub(super) endpoint_writers: Vec, pub(super) available_endpoints: usize, pub(super) available_pct: f64, pub(super) required_writers: usize, @@ -279,6 +280,12 @@ pub(super) struct DcStatus { pub(super) load: usize, } +#[derive(Serialize, Clone)] +pub(super) struct DcEndpointWriters { + pub(super) endpoint: String, + pub(super) active_writers: usize, +} + #[derive(Serialize, Clone)] pub(super) struct DcStatusData { pub(super) middle_proxy_enabled: bool, @@ -354,6 +361,8 @@ pub(super) struct MinimalMeRuntimeData { pub(super) me_single_endpoint_outage_backoff_max_ms: u64, pub(super) me_single_endpoint_shadow_rotate_every_secs: u64, pub(super) me_deterministic_writer_sort: bool, + pub(super) me_writer_pick_mode: &'static str, + pub(super) me_writer_pick_sample_size: u8, pub(super) me_socks_kdf_policy: &'static str, pub(super) quarantined_endpoints_total: usize, pub(super) quarantined_endpoints: Vec, diff --git a/src/api/runtime_stats.rs b/src/api/runtime_stats.rs index f90abe3..139a4c5 100644 --- a/src/api/runtime_stats.rs +++ b/src/api/runtime_stats.rs @@ -7,10 +7,10 @@ use crate::transport::UpstreamRouteKind; use super::ApiShared; use super::model::{ - DcStatus, DcStatusData, MeWriterStatus, MeWritersData, MeWritersSummary, MinimalAllData, - MinimalAllPayload, MinimalDcPathData, MinimalMeRuntimeData, MinimalQuarantineData, - UpstreamDcStatus, UpstreamStatus, UpstreamSummaryData, UpstreamsData, ZeroAllData, - ZeroCodeCount, ZeroCoreData, ZeroDesyncData, ZeroMiddleProxyData, ZeroPoolData, + DcEndpointWriters, DcStatus, DcStatusData, MeWriterStatus, MeWritersData, MeWritersSummary, + MinimalAllData, MinimalAllPayload, MinimalDcPathData, MinimalMeRuntimeData, + MinimalQuarantineData, UpstreamDcStatus, UpstreamStatus, UpstreamSummaryData, UpstreamsData, + ZeroAllData, ZeroCodeCount, ZeroCoreData, ZeroDesyncData, ZeroMiddleProxyData, ZeroPoolData, ZeroUpstreamData, }; @@ -346,6 +346,14 @@ async fn get_minimal_payload_cached( .into_iter() .map(|value| value.to_string()) .collect(), + endpoint_writers: entry + .endpoint_writers + .into_iter() + .map(|coverage| DcEndpointWriters { + endpoint: coverage.endpoint.to_string(), + active_writers: coverage.active_writers, + }) + .collect(), available_endpoints: entry.available_endpoints, available_pct: entry.available_pct, required_writers: entry.required_writers, @@ -422,6 +430,8 @@ async fn get_minimal_payload_cached( me_single_endpoint_shadow_rotate_every_secs: runtime .me_single_endpoint_shadow_rotate_every_secs, me_deterministic_writer_sort: runtime.me_deterministic_writer_sort, + me_writer_pick_mode: runtime.me_writer_pick_mode, + me_writer_pick_sample_size: runtime.me_writer_pick_sample_size, me_socks_kdf_policy: runtime.me_socks_kdf_policy, quarantined_endpoints_total: runtime.quarantined_endpoints.len(), quarantined_endpoints: runtime diff --git a/src/api/runtime_zero.rs b/src/api/runtime_zero.rs index 5020705..7d3d778 100644 --- a/src/api/runtime_zero.rs +++ b/src/api/runtime_zero.rs @@ -2,7 +2,7 @@ use std::sync::atomic::Ordering; use serde::Serialize; -use crate::config::{MeFloorMode, ProxyConfig, UserMaxUniqueIpsMode}; +use crate::config::{MeFloorMode, MeWriterPickMode, ProxyConfig, UserMaxUniqueIpsMode}; use super::ApiShared; use super::runtime_init::build_runtime_startup_summary; @@ -78,6 +78,8 @@ pub(super) struct EffectiveMiddleProxyLimits { pub(super) reconnect_backoff_base_ms: u64, pub(super) reconnect_backoff_cap_ms: u64, pub(super) reconnect_fast_retry_count: u32, + pub(super) writer_pick_mode: &'static str, + pub(super) writer_pick_sample_size: u8, pub(super) me2dc_fallback: bool, } @@ -237,6 +239,8 @@ pub(super) fn build_limits_effective_data(cfg: &ProxyConfig) -> EffectiveLimitsD reconnect_backoff_base_ms: cfg.general.me_reconnect_backoff_base_ms, reconnect_backoff_cap_ms: cfg.general.me_reconnect_backoff_cap_ms, reconnect_fast_retry_count: cfg.general.me_reconnect_fast_retry_count, + writer_pick_mode: me_writer_pick_mode_label(cfg.general.me_writer_pick_mode), + writer_pick_sample_size: cfg.general.me_writer_pick_sample_size, me2dc_fallback: cfg.general.me2dc_fallback, }, user_ip_policy: EffectiveUserIpPolicyLimits { @@ -274,3 +278,10 @@ fn me_floor_mode_label(mode: MeFloorMode) -> &'static str { MeFloorMode::Adaptive => "adaptive", } } + +fn me_writer_pick_mode_label(mode: MeWriterPickMode) -> &'static str { + match mode { + MeWriterPickMode::SortedRr => "sorted_rr", + MeWriterPickMode::P2c => "p2c", + } +} diff --git a/src/metrics.rs b/src/metrics.rs index b338df5..917c9b3 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -689,6 +689,135 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp } ); + let _ = writeln!( + out, + "# HELP telemt_me_writer_pick_total ME writer-pick outcomes by mode and result" + ); + let _ = writeln!(out, "# TYPE telemt_me_writer_pick_total counter"); + let _ = writeln!( + out, + "telemt_me_writer_pick_total{{mode=\"sorted_rr\",result=\"success_try\"}} {}", + if me_allows_normal { + stats.get_me_writer_pick_sorted_rr_success_try_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_writer_pick_total{{mode=\"sorted_rr\",result=\"success_fallback\"}} {}", + if me_allows_normal { + stats.get_me_writer_pick_sorted_rr_success_fallback_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_writer_pick_total{{mode=\"sorted_rr\",result=\"full\"}} {}", + if me_allows_normal { + stats.get_me_writer_pick_sorted_rr_full_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_writer_pick_total{{mode=\"sorted_rr\",result=\"closed\"}} {}", + if me_allows_normal { + stats.get_me_writer_pick_sorted_rr_closed_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_writer_pick_total{{mode=\"sorted_rr\",result=\"no_candidate\"}} {}", + if me_allows_normal { + stats.get_me_writer_pick_sorted_rr_no_candidate_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_writer_pick_total{{mode=\"p2c\",result=\"success_try\"}} {}", + if me_allows_normal { + stats.get_me_writer_pick_p2c_success_try_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_writer_pick_total{{mode=\"p2c\",result=\"success_fallback\"}} {}", + if me_allows_normal { + stats.get_me_writer_pick_p2c_success_fallback_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_writer_pick_total{{mode=\"p2c\",result=\"full\"}} {}", + if me_allows_normal { + stats.get_me_writer_pick_p2c_full_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_writer_pick_total{{mode=\"p2c\",result=\"closed\"}} {}", + if me_allows_normal { + stats.get_me_writer_pick_p2c_closed_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_writer_pick_total{{mode=\"p2c\",result=\"no_candidate\"}} {}", + if me_allows_normal { + stats.get_me_writer_pick_p2c_no_candidate_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_writer_pick_blocking_fallback_total ME writer-pick blocking fallback attempts" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_writer_pick_blocking_fallback_total counter" + ); + let _ = writeln!( + out, + "telemt_me_writer_pick_blocking_fallback_total {}", + if me_allows_normal { + stats.get_me_writer_pick_blocking_fallback_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_writer_pick_mode_switch_total Writer-pick mode switches via runtime updates" + ); + let _ = writeln!(out, "# TYPE telemt_me_writer_pick_mode_switch_total counter"); + let _ = writeln!( + out, + "telemt_me_writer_pick_mode_switch_total {}", + if me_allows_normal { + stats.get_me_writer_pick_mode_switch_total() + } else { + 0 + } + ); + let _ = writeln!( out, "# HELP telemt_me_socks_kdf_policy_total SOCKS KDF policy outcomes" diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 10d8882..25905b2 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -16,7 +16,7 @@ use std::collections::hash_map::DefaultHasher; use std::collections::VecDeque; use tracing::debug; -use crate::config::MeTelemetryLevel; +use crate::config::{MeTelemetryLevel, MeWriterPickMode}; use self::telemetry::TelemetryPolicy; // ============= Stats ============= @@ -95,6 +95,18 @@ pub struct Stats { me_route_drop_queue_full: AtomicU64, me_route_drop_queue_full_base: AtomicU64, me_route_drop_queue_full_high: AtomicU64, + me_writer_pick_sorted_rr_success_try_total: AtomicU64, + me_writer_pick_sorted_rr_success_fallback_total: AtomicU64, + me_writer_pick_sorted_rr_full_total: AtomicU64, + me_writer_pick_sorted_rr_closed_total: AtomicU64, + me_writer_pick_sorted_rr_no_candidate_total: AtomicU64, + me_writer_pick_p2c_success_try_total: AtomicU64, + me_writer_pick_p2c_success_fallback_total: AtomicU64, + me_writer_pick_p2c_full_total: AtomicU64, + me_writer_pick_p2c_closed_total: AtomicU64, + me_writer_pick_p2c_no_candidate_total: AtomicU64, + me_writer_pick_blocking_fallback_total: AtomicU64, + me_writer_pick_mode_switch_total: AtomicU64, me_socks_kdf_strict_reject: AtomicU64, me_socks_kdf_compat_fallback: AtomicU64, secure_padding_invalid: AtomicU64, @@ -497,6 +509,93 @@ impl Stats { self.me_route_drop_queue_full_high.fetch_add(1, Ordering::Relaxed); } } + pub fn increment_me_writer_pick_success_try_total(&self, mode: MeWriterPickMode) { + if !self.telemetry_me_allows_normal() { + return; + } + match mode { + MeWriterPickMode::SortedRr => { + self.me_writer_pick_sorted_rr_success_try_total + .fetch_add(1, Ordering::Relaxed); + } + MeWriterPickMode::P2c => { + self.me_writer_pick_p2c_success_try_total + .fetch_add(1, Ordering::Relaxed); + } + } + } + pub fn increment_me_writer_pick_success_fallback_total(&self, mode: MeWriterPickMode) { + if !self.telemetry_me_allows_normal() { + return; + } + match mode { + MeWriterPickMode::SortedRr => { + self.me_writer_pick_sorted_rr_success_fallback_total + .fetch_add(1, Ordering::Relaxed); + } + MeWriterPickMode::P2c => { + self.me_writer_pick_p2c_success_fallback_total + .fetch_add(1, Ordering::Relaxed); + } + } + } + pub fn increment_me_writer_pick_full_total(&self, mode: MeWriterPickMode) { + if !self.telemetry_me_allows_normal() { + return; + } + match mode { + MeWriterPickMode::SortedRr => { + self.me_writer_pick_sorted_rr_full_total + .fetch_add(1, Ordering::Relaxed); + } + MeWriterPickMode::P2c => { + self.me_writer_pick_p2c_full_total + .fetch_add(1, Ordering::Relaxed); + } + } + } + pub fn increment_me_writer_pick_closed_total(&self, mode: MeWriterPickMode) { + if !self.telemetry_me_allows_normal() { + return; + } + match mode { + MeWriterPickMode::SortedRr => { + self.me_writer_pick_sorted_rr_closed_total + .fetch_add(1, Ordering::Relaxed); + } + MeWriterPickMode::P2c => { + self.me_writer_pick_p2c_closed_total + .fetch_add(1, Ordering::Relaxed); + } + } + } + pub fn increment_me_writer_pick_no_candidate_total(&self, mode: MeWriterPickMode) { + if !self.telemetry_me_allows_normal() { + return; + } + match mode { + MeWriterPickMode::SortedRr => { + self.me_writer_pick_sorted_rr_no_candidate_total + .fetch_add(1, Ordering::Relaxed); + } + MeWriterPickMode::P2c => { + self.me_writer_pick_p2c_no_candidate_total + .fetch_add(1, Ordering::Relaxed); + } + } + } + pub fn increment_me_writer_pick_blocking_fallback_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_writer_pick_blocking_fallback_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_writer_pick_mode_switch_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_writer_pick_mode_switch_total + .fetch_add(1, Ordering::Relaxed); + } + } pub fn increment_me_socks_kdf_strict_reject(&self) { if self.telemetry_me_allows_normal() { self.me_socks_kdf_strict_reject.fetch_add(1, Ordering::Relaxed); @@ -1001,6 +1100,52 @@ impl Stats { pub fn get_me_route_drop_queue_full_high(&self) -> u64 { self.me_route_drop_queue_full_high.load(Ordering::Relaxed) } + pub fn get_me_writer_pick_sorted_rr_success_try_total(&self) -> u64 { + self.me_writer_pick_sorted_rr_success_try_total + .load(Ordering::Relaxed) + } + pub fn get_me_writer_pick_sorted_rr_success_fallback_total(&self) -> u64 { + self.me_writer_pick_sorted_rr_success_fallback_total + .load(Ordering::Relaxed) + } + pub fn get_me_writer_pick_sorted_rr_full_total(&self) -> u64 { + self.me_writer_pick_sorted_rr_full_total + .load(Ordering::Relaxed) + } + pub fn get_me_writer_pick_sorted_rr_closed_total(&self) -> u64 { + self.me_writer_pick_sorted_rr_closed_total + .load(Ordering::Relaxed) + } + pub fn get_me_writer_pick_sorted_rr_no_candidate_total(&self) -> u64 { + self.me_writer_pick_sorted_rr_no_candidate_total + .load(Ordering::Relaxed) + } + pub fn get_me_writer_pick_p2c_success_try_total(&self) -> u64 { + self.me_writer_pick_p2c_success_try_total + .load(Ordering::Relaxed) + } + pub fn get_me_writer_pick_p2c_success_fallback_total(&self) -> u64 { + self.me_writer_pick_p2c_success_fallback_total + .load(Ordering::Relaxed) + } + pub fn get_me_writer_pick_p2c_full_total(&self) -> u64 { + self.me_writer_pick_p2c_full_total.load(Ordering::Relaxed) + } + pub fn get_me_writer_pick_p2c_closed_total(&self) -> u64 { + self.me_writer_pick_p2c_closed_total.load(Ordering::Relaxed) + } + pub fn get_me_writer_pick_p2c_no_candidate_total(&self) -> u64 { + self.me_writer_pick_p2c_no_candidate_total + .load(Ordering::Relaxed) + } + pub fn get_me_writer_pick_blocking_fallback_total(&self) -> u64 { + self.me_writer_pick_blocking_fallback_total + .load(Ordering::Relaxed) + } + pub fn get_me_writer_pick_mode_switch_total(&self) -> u64 { + self.me_writer_pick_mode_switch_total + .load(Ordering::Relaxed) + } pub fn get_me_socks_kdf_strict_reject(&self) -> u64 { self.me_socks_kdf_strict_reject.load(Ordering::Relaxed) }