ME Writer Pick in Metrics+API

This commit is contained in:
Alexey 2026-03-08 03:06:45 +03:00
parent 9401c46727
commit fca0e3f619
No known key found for this signature in database
5 changed files with 310 additions and 6 deletions

View File

@ -266,6 +266,7 @@ pub(super) struct MeWritersData {
pub(super) struct DcStatus { pub(super) struct DcStatus {
pub(super) dc: i16, pub(super) dc: i16,
pub(super) endpoints: Vec<String>, pub(super) endpoints: Vec<String>,
pub(super) endpoint_writers: Vec<DcEndpointWriters>,
pub(super) available_endpoints: usize, pub(super) available_endpoints: usize,
pub(super) available_pct: f64, pub(super) available_pct: f64,
pub(super) required_writers: usize, pub(super) required_writers: usize,
@ -279,6 +280,12 @@ pub(super) struct DcStatus {
pub(super) load: usize, pub(super) load: usize,
} }
#[derive(Serialize, Clone)]
pub(super) struct DcEndpointWriters {
pub(super) endpoint: String,
pub(super) active_writers: usize,
}
#[derive(Serialize, Clone)] #[derive(Serialize, Clone)]
pub(super) struct DcStatusData { pub(super) struct DcStatusData {
pub(super) middle_proxy_enabled: bool, pub(super) middle_proxy_enabled: bool,
@ -354,6 +361,8 @@ pub(super) struct MinimalMeRuntimeData {
pub(super) me_single_endpoint_outage_backoff_max_ms: u64, pub(super) me_single_endpoint_outage_backoff_max_ms: u64,
pub(super) me_single_endpoint_shadow_rotate_every_secs: u64, pub(super) me_single_endpoint_shadow_rotate_every_secs: u64,
pub(super) me_deterministic_writer_sort: bool, pub(super) me_deterministic_writer_sort: bool,
pub(super) me_writer_pick_mode: &'static str,
pub(super) me_writer_pick_sample_size: u8,
pub(super) me_socks_kdf_policy: &'static str, pub(super) me_socks_kdf_policy: &'static str,
pub(super) quarantined_endpoints_total: usize, pub(super) quarantined_endpoints_total: usize,
pub(super) quarantined_endpoints: Vec<MinimalQuarantineData>, pub(super) quarantined_endpoints: Vec<MinimalQuarantineData>,

View File

@ -7,10 +7,10 @@ use crate::transport::UpstreamRouteKind;
use super::ApiShared; use super::ApiShared;
use super::model::{ use super::model::{
DcStatus, DcStatusData, MeWriterStatus, MeWritersData, MeWritersSummary, MinimalAllData, DcEndpointWriters, DcStatus, DcStatusData, MeWriterStatus, MeWritersData, MeWritersSummary,
MinimalAllPayload, MinimalDcPathData, MinimalMeRuntimeData, MinimalQuarantineData, MinimalAllData, MinimalAllPayload, MinimalDcPathData, MinimalMeRuntimeData,
UpstreamDcStatus, UpstreamStatus, UpstreamSummaryData, UpstreamsData, ZeroAllData, MinimalQuarantineData, UpstreamDcStatus, UpstreamStatus, UpstreamSummaryData, UpstreamsData,
ZeroCodeCount, ZeroCoreData, ZeroDesyncData, ZeroMiddleProxyData, ZeroPoolData, ZeroAllData, ZeroCodeCount, ZeroCoreData, ZeroDesyncData, ZeroMiddleProxyData, ZeroPoolData,
ZeroUpstreamData, ZeroUpstreamData,
}; };
@ -346,6 +346,14 @@ async fn get_minimal_payload_cached(
.into_iter() .into_iter()
.map(|value| value.to_string()) .map(|value| value.to_string())
.collect(), .collect(),
endpoint_writers: entry
.endpoint_writers
.into_iter()
.map(|coverage| DcEndpointWriters {
endpoint: coverage.endpoint.to_string(),
active_writers: coverage.active_writers,
})
.collect(),
available_endpoints: entry.available_endpoints, available_endpoints: entry.available_endpoints,
available_pct: entry.available_pct, available_pct: entry.available_pct,
required_writers: entry.required_writers, required_writers: entry.required_writers,
@ -422,6 +430,8 @@ async fn get_minimal_payload_cached(
me_single_endpoint_shadow_rotate_every_secs: runtime me_single_endpoint_shadow_rotate_every_secs: runtime
.me_single_endpoint_shadow_rotate_every_secs, .me_single_endpoint_shadow_rotate_every_secs,
me_deterministic_writer_sort: runtime.me_deterministic_writer_sort, me_deterministic_writer_sort: runtime.me_deterministic_writer_sort,
me_writer_pick_mode: runtime.me_writer_pick_mode,
me_writer_pick_sample_size: runtime.me_writer_pick_sample_size,
me_socks_kdf_policy: runtime.me_socks_kdf_policy, me_socks_kdf_policy: runtime.me_socks_kdf_policy,
quarantined_endpoints_total: runtime.quarantined_endpoints.len(), quarantined_endpoints_total: runtime.quarantined_endpoints.len(),
quarantined_endpoints: runtime quarantined_endpoints: runtime

View File

@ -2,7 +2,7 @@ use std::sync::atomic::Ordering;
use serde::Serialize; use serde::Serialize;
use crate::config::{MeFloorMode, ProxyConfig, UserMaxUniqueIpsMode}; use crate::config::{MeFloorMode, MeWriterPickMode, ProxyConfig, UserMaxUniqueIpsMode};
use super::ApiShared; use super::ApiShared;
use super::runtime_init::build_runtime_startup_summary; use super::runtime_init::build_runtime_startup_summary;
@ -78,6 +78,8 @@ pub(super) struct EffectiveMiddleProxyLimits {
pub(super) reconnect_backoff_base_ms: u64, pub(super) reconnect_backoff_base_ms: u64,
pub(super) reconnect_backoff_cap_ms: u64, pub(super) reconnect_backoff_cap_ms: u64,
pub(super) reconnect_fast_retry_count: u32, pub(super) reconnect_fast_retry_count: u32,
pub(super) writer_pick_mode: &'static str,
pub(super) writer_pick_sample_size: u8,
pub(super) me2dc_fallback: bool, pub(super) me2dc_fallback: bool,
} }
@ -237,6 +239,8 @@ pub(super) fn build_limits_effective_data(cfg: &ProxyConfig) -> EffectiveLimitsD
reconnect_backoff_base_ms: cfg.general.me_reconnect_backoff_base_ms, reconnect_backoff_base_ms: cfg.general.me_reconnect_backoff_base_ms,
reconnect_backoff_cap_ms: cfg.general.me_reconnect_backoff_cap_ms, reconnect_backoff_cap_ms: cfg.general.me_reconnect_backoff_cap_ms,
reconnect_fast_retry_count: cfg.general.me_reconnect_fast_retry_count, reconnect_fast_retry_count: cfg.general.me_reconnect_fast_retry_count,
writer_pick_mode: me_writer_pick_mode_label(cfg.general.me_writer_pick_mode),
writer_pick_sample_size: cfg.general.me_writer_pick_sample_size,
me2dc_fallback: cfg.general.me2dc_fallback, me2dc_fallback: cfg.general.me2dc_fallback,
}, },
user_ip_policy: EffectiveUserIpPolicyLimits { user_ip_policy: EffectiveUserIpPolicyLimits {
@ -274,3 +278,10 @@ fn me_floor_mode_label(mode: MeFloorMode) -> &'static str {
MeFloorMode::Adaptive => "adaptive", MeFloorMode::Adaptive => "adaptive",
} }
} }
fn me_writer_pick_mode_label(mode: MeWriterPickMode) -> &'static str {
match mode {
MeWriterPickMode::SortedRr => "sorted_rr",
MeWriterPickMode::P2c => "p2c",
}
}

View File

@ -689,6 +689,135 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
} }
); );
let _ = writeln!(
out,
"# HELP telemt_me_writer_pick_total ME writer-pick outcomes by mode and result"
);
let _ = writeln!(out, "# TYPE telemt_me_writer_pick_total counter");
let _ = writeln!(
out,
"telemt_me_writer_pick_total{{mode=\"sorted_rr\",result=\"success_try\"}} {}",
if me_allows_normal {
stats.get_me_writer_pick_sorted_rr_success_try_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_writer_pick_total{{mode=\"sorted_rr\",result=\"success_fallback\"}} {}",
if me_allows_normal {
stats.get_me_writer_pick_sorted_rr_success_fallback_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_writer_pick_total{{mode=\"sorted_rr\",result=\"full\"}} {}",
if me_allows_normal {
stats.get_me_writer_pick_sorted_rr_full_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_writer_pick_total{{mode=\"sorted_rr\",result=\"closed\"}} {}",
if me_allows_normal {
stats.get_me_writer_pick_sorted_rr_closed_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_writer_pick_total{{mode=\"sorted_rr\",result=\"no_candidate\"}} {}",
if me_allows_normal {
stats.get_me_writer_pick_sorted_rr_no_candidate_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_writer_pick_total{{mode=\"p2c\",result=\"success_try\"}} {}",
if me_allows_normal {
stats.get_me_writer_pick_p2c_success_try_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_writer_pick_total{{mode=\"p2c\",result=\"success_fallback\"}} {}",
if me_allows_normal {
stats.get_me_writer_pick_p2c_success_fallback_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_writer_pick_total{{mode=\"p2c\",result=\"full\"}} {}",
if me_allows_normal {
stats.get_me_writer_pick_p2c_full_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_writer_pick_total{{mode=\"p2c\",result=\"closed\"}} {}",
if me_allows_normal {
stats.get_me_writer_pick_p2c_closed_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_writer_pick_total{{mode=\"p2c\",result=\"no_candidate\"}} {}",
if me_allows_normal {
stats.get_me_writer_pick_p2c_no_candidate_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_writer_pick_blocking_fallback_total ME writer-pick blocking fallback attempts"
);
let _ = writeln!(
out,
"# TYPE telemt_me_writer_pick_blocking_fallback_total counter"
);
let _ = writeln!(
out,
"telemt_me_writer_pick_blocking_fallback_total {}",
if me_allows_normal {
stats.get_me_writer_pick_blocking_fallback_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_writer_pick_mode_switch_total Writer-pick mode switches via runtime updates"
);
let _ = writeln!(out, "# TYPE telemt_me_writer_pick_mode_switch_total counter");
let _ = writeln!(
out,
"telemt_me_writer_pick_mode_switch_total {}",
if me_allows_normal {
stats.get_me_writer_pick_mode_switch_total()
} else {
0
}
);
let _ = writeln!( let _ = writeln!(
out, out,
"# HELP telemt_me_socks_kdf_policy_total SOCKS KDF policy outcomes" "# HELP telemt_me_socks_kdf_policy_total SOCKS KDF policy outcomes"

View File

@ -16,7 +16,7 @@ use std::collections::hash_map::DefaultHasher;
use std::collections::VecDeque; use std::collections::VecDeque;
use tracing::debug; use tracing::debug;
use crate::config::MeTelemetryLevel; use crate::config::{MeTelemetryLevel, MeWriterPickMode};
use self::telemetry::TelemetryPolicy; use self::telemetry::TelemetryPolicy;
// ============= Stats ============= // ============= Stats =============
@ -95,6 +95,18 @@ pub struct Stats {
me_route_drop_queue_full: AtomicU64, me_route_drop_queue_full: AtomicU64,
me_route_drop_queue_full_base: AtomicU64, me_route_drop_queue_full_base: AtomicU64,
me_route_drop_queue_full_high: AtomicU64, me_route_drop_queue_full_high: AtomicU64,
me_writer_pick_sorted_rr_success_try_total: AtomicU64,
me_writer_pick_sorted_rr_success_fallback_total: AtomicU64,
me_writer_pick_sorted_rr_full_total: AtomicU64,
me_writer_pick_sorted_rr_closed_total: AtomicU64,
me_writer_pick_sorted_rr_no_candidate_total: AtomicU64,
me_writer_pick_p2c_success_try_total: AtomicU64,
me_writer_pick_p2c_success_fallback_total: AtomicU64,
me_writer_pick_p2c_full_total: AtomicU64,
me_writer_pick_p2c_closed_total: AtomicU64,
me_writer_pick_p2c_no_candidate_total: AtomicU64,
me_writer_pick_blocking_fallback_total: AtomicU64,
me_writer_pick_mode_switch_total: AtomicU64,
me_socks_kdf_strict_reject: AtomicU64, me_socks_kdf_strict_reject: AtomicU64,
me_socks_kdf_compat_fallback: AtomicU64, me_socks_kdf_compat_fallback: AtomicU64,
secure_padding_invalid: AtomicU64, secure_padding_invalid: AtomicU64,
@ -497,6 +509,93 @@ impl Stats {
self.me_route_drop_queue_full_high.fetch_add(1, Ordering::Relaxed); self.me_route_drop_queue_full_high.fetch_add(1, Ordering::Relaxed);
} }
} }
pub fn increment_me_writer_pick_success_try_total(&self, mode: MeWriterPickMode) {
if !self.telemetry_me_allows_normal() {
return;
}
match mode {
MeWriterPickMode::SortedRr => {
self.me_writer_pick_sorted_rr_success_try_total
.fetch_add(1, Ordering::Relaxed);
}
MeWriterPickMode::P2c => {
self.me_writer_pick_p2c_success_try_total
.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn increment_me_writer_pick_success_fallback_total(&self, mode: MeWriterPickMode) {
if !self.telemetry_me_allows_normal() {
return;
}
match mode {
MeWriterPickMode::SortedRr => {
self.me_writer_pick_sorted_rr_success_fallback_total
.fetch_add(1, Ordering::Relaxed);
}
MeWriterPickMode::P2c => {
self.me_writer_pick_p2c_success_fallback_total
.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn increment_me_writer_pick_full_total(&self, mode: MeWriterPickMode) {
if !self.telemetry_me_allows_normal() {
return;
}
match mode {
MeWriterPickMode::SortedRr => {
self.me_writer_pick_sorted_rr_full_total
.fetch_add(1, Ordering::Relaxed);
}
MeWriterPickMode::P2c => {
self.me_writer_pick_p2c_full_total
.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn increment_me_writer_pick_closed_total(&self, mode: MeWriterPickMode) {
if !self.telemetry_me_allows_normal() {
return;
}
match mode {
MeWriterPickMode::SortedRr => {
self.me_writer_pick_sorted_rr_closed_total
.fetch_add(1, Ordering::Relaxed);
}
MeWriterPickMode::P2c => {
self.me_writer_pick_p2c_closed_total
.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn increment_me_writer_pick_no_candidate_total(&self, mode: MeWriterPickMode) {
if !self.telemetry_me_allows_normal() {
return;
}
match mode {
MeWriterPickMode::SortedRr => {
self.me_writer_pick_sorted_rr_no_candidate_total
.fetch_add(1, Ordering::Relaxed);
}
MeWriterPickMode::P2c => {
self.me_writer_pick_p2c_no_candidate_total
.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn increment_me_writer_pick_blocking_fallback_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_writer_pick_blocking_fallback_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_writer_pick_mode_switch_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_writer_pick_mode_switch_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_socks_kdf_strict_reject(&self) { pub fn increment_me_socks_kdf_strict_reject(&self) {
if self.telemetry_me_allows_normal() { if self.telemetry_me_allows_normal() {
self.me_socks_kdf_strict_reject.fetch_add(1, Ordering::Relaxed); self.me_socks_kdf_strict_reject.fetch_add(1, Ordering::Relaxed);
@ -1001,6 +1100,52 @@ impl Stats {
pub fn get_me_route_drop_queue_full_high(&self) -> u64 { pub fn get_me_route_drop_queue_full_high(&self) -> u64 {
self.me_route_drop_queue_full_high.load(Ordering::Relaxed) self.me_route_drop_queue_full_high.load(Ordering::Relaxed)
} }
pub fn get_me_writer_pick_sorted_rr_success_try_total(&self) -> u64 {
self.me_writer_pick_sorted_rr_success_try_total
.load(Ordering::Relaxed)
}
pub fn get_me_writer_pick_sorted_rr_success_fallback_total(&self) -> u64 {
self.me_writer_pick_sorted_rr_success_fallback_total
.load(Ordering::Relaxed)
}
pub fn get_me_writer_pick_sorted_rr_full_total(&self) -> u64 {
self.me_writer_pick_sorted_rr_full_total
.load(Ordering::Relaxed)
}
pub fn get_me_writer_pick_sorted_rr_closed_total(&self) -> u64 {
self.me_writer_pick_sorted_rr_closed_total
.load(Ordering::Relaxed)
}
pub fn get_me_writer_pick_sorted_rr_no_candidate_total(&self) -> u64 {
self.me_writer_pick_sorted_rr_no_candidate_total
.load(Ordering::Relaxed)
}
pub fn get_me_writer_pick_p2c_success_try_total(&self) -> u64 {
self.me_writer_pick_p2c_success_try_total
.load(Ordering::Relaxed)
}
pub fn get_me_writer_pick_p2c_success_fallback_total(&self) -> u64 {
self.me_writer_pick_p2c_success_fallback_total
.load(Ordering::Relaxed)
}
pub fn get_me_writer_pick_p2c_full_total(&self) -> u64 {
self.me_writer_pick_p2c_full_total.load(Ordering::Relaxed)
}
pub fn get_me_writer_pick_p2c_closed_total(&self) -> u64 {
self.me_writer_pick_p2c_closed_total.load(Ordering::Relaxed)
}
pub fn get_me_writer_pick_p2c_no_candidate_total(&self) -> u64 {
self.me_writer_pick_p2c_no_candidate_total
.load(Ordering::Relaxed)
}
pub fn get_me_writer_pick_blocking_fallback_total(&self) -> u64 {
self.me_writer_pick_blocking_fallback_total
.load(Ordering::Relaxed)
}
pub fn get_me_writer_pick_mode_switch_total(&self) -> u64 {
self.me_writer_pick_mode_switch_total
.load(Ordering::Relaxed)
}
pub fn get_me_socks_kdf_strict_reject(&self) -> u64 { pub fn get_me_socks_kdf_strict_reject(&self) -> u64 {
self.me_socks_kdf_strict_reject.load(Ordering::Relaxed) self.me_socks_kdf_strict_reject.load(Ordering::Relaxed)
} }