Statistics on ME + Dynamic backpressure + KDF with SOCKS

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey 2026-02-28 13:18:31 +03:00
parent fa2423dadf
commit 8b39a4ef6d
No known key found for this signature in database
13 changed files with 1108 additions and 131 deletions

View File

@ -170,6 +170,18 @@ pub(crate) fn default_desync_all_full() -> bool {
false
}
pub(crate) fn default_me_route_backpressure_base_timeout_ms() -> u64 {
25
}
pub(crate) fn default_me_route_backpressure_high_timeout_ms() -> u64 {
120
}
pub(crate) fn default_me_route_backpressure_high_watermark_pct() -> u8 {
80
}
pub(crate) fn default_beobachten_minutes() -> u64 {
10
}

View File

@ -16,6 +16,7 @@
//! | `general` | `me_pool_drain_ttl_secs` | Applied on next ME map update |
//! | `general` | `me_pool_min_fresh_ratio` | Applied on next ME map update |
//! | `general` | `me_reinit_drain_timeout_secs`| Applied on next ME map update |
//! | `general` | `telemetry` / `me_*_policy` | Applied immediately |
//! | `network` | `dns_overrides` | Applied immediately |
//! | `access` | All user/quota fields | Effective immediately |
//!
@ -30,7 +31,7 @@ use notify::{EventKind, RecursiveMode, Watcher, recommended_watcher};
use tokio::sync::{mpsc, watch};
use tracing::{error, info, warn};
use crate::config::LogLevel;
use crate::config::{LogLevel, MeSocksKdfPolicy, MeTelemetryLevel};
use super::load::ProxyConfig;
// ── Hot fields ────────────────────────────────────────────────────────────────
@ -52,6 +53,13 @@ pub struct HotFields {
pub me_keepalive_interval_secs: u64,
pub me_keepalive_jitter_secs: u64,
pub me_keepalive_payload_random: bool,
pub telemetry_core_enabled: bool,
pub telemetry_user_enabled: bool,
pub telemetry_me_level: MeTelemetryLevel,
pub me_socks_kdf_policy: MeSocksKdfPolicy,
pub me_route_backpressure_base_timeout_ms: u64,
pub me_route_backpressure_high_timeout_ms: u64,
pub me_route_backpressure_high_watermark_pct: u8,
pub access: crate::config::AccessConfig,
}
@ -72,6 +80,13 @@ impl HotFields {
me_keepalive_interval_secs: cfg.general.me_keepalive_interval_secs,
me_keepalive_jitter_secs: cfg.general.me_keepalive_jitter_secs,
me_keepalive_payload_random: cfg.general.me_keepalive_payload_random,
telemetry_core_enabled: cfg.general.telemetry.core_enabled,
telemetry_user_enabled: cfg.general.telemetry.user_enabled,
telemetry_me_level: cfg.general.telemetry.me_level,
me_socks_kdf_policy: cfg.general.me_socks_kdf_policy,
me_route_backpressure_base_timeout_ms: cfg.general.me_route_backpressure_base_timeout_ms,
me_route_backpressure_high_timeout_ms: cfg.general.me_route_backpressure_high_timeout_ms,
me_route_backpressure_high_watermark_pct: cfg.general.me_route_backpressure_high_watermark_pct,
access: cfg.access.clone(),
}
}
@ -262,6 +277,41 @@ fn log_changes(
);
}
if old_hot.telemetry_core_enabled != new_hot.telemetry_core_enabled
|| old_hot.telemetry_user_enabled != new_hot.telemetry_user_enabled
|| old_hot.telemetry_me_level != new_hot.telemetry_me_level
{
info!(
"config reload: telemetry: core_enabled={} user_enabled={} me_level={}",
new_hot.telemetry_core_enabled,
new_hot.telemetry_user_enabled,
new_hot.telemetry_me_level,
);
}
if old_hot.me_socks_kdf_policy != new_hot.me_socks_kdf_policy {
info!(
"config reload: me_socks_kdf_policy: {:?} → {:?}",
old_hot.me_socks_kdf_policy,
new_hot.me_socks_kdf_policy,
);
}
if old_hot.me_route_backpressure_base_timeout_ms
!= new_hot.me_route_backpressure_base_timeout_ms
|| old_hot.me_route_backpressure_high_timeout_ms
!= new_hot.me_route_backpressure_high_timeout_ms
|| old_hot.me_route_backpressure_high_watermark_pct
!= new_hot.me_route_backpressure_high_watermark_pct
{
info!(
"config reload: me_route_backpressure: base={}ms high={}ms watermark={}%",
new_hot.me_route_backpressure_base_timeout_ms,
new_hot.me_route_backpressure_high_timeout_ms,
new_hot.me_route_backpressure_high_watermark_pct,
);
}
if old_hot.access.users != new_hot.access.users {
let mut added: Vec<&String> = new_hot.access.users.keys()
.filter(|u| !old_hot.access.users.contains_key(*u))

View File

@ -311,6 +311,26 @@ impl ProxyConfig {
));
}
if config.general.me_route_backpressure_base_timeout_ms == 0 {
return Err(ProxyError::Config(
"general.me_route_backpressure_base_timeout_ms must be > 0".to_string(),
));
}
if config.general.me_route_backpressure_high_timeout_ms
< config.general.me_route_backpressure_base_timeout_ms
{
return Err(ProxyError::Config(
"general.me_route_backpressure_high_timeout_ms must be >= general.me_route_backpressure_base_timeout_ms".to_string(),
));
}
if !(1..=100).contains(&config.general.me_route_backpressure_high_watermark_pct) {
return Err(ProxyError::Config(
"general.me_route_backpressure_high_watermark_pct must be within [1, 100]".to_string(),
));
}
if config.general.effective_me_pool_force_close_secs() > 0
&& config.general.effective_me_pool_force_close_secs()
< config.general.me_pool_drain_ttl_secs

View File

@ -59,6 +59,98 @@ impl std::fmt::Display for LogLevel {
}
}
/// Middle-End telemetry verbosity level.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "lowercase")]
pub enum MeTelemetryLevel {
#[default]
Normal,
Silent,
Debug,
}
impl MeTelemetryLevel {
pub fn as_u8(self) -> u8 {
match self {
MeTelemetryLevel::Silent => 0,
MeTelemetryLevel::Normal => 1,
MeTelemetryLevel::Debug => 2,
}
}
pub fn from_u8(raw: u8) -> Self {
match raw {
0 => MeTelemetryLevel::Silent,
2 => MeTelemetryLevel::Debug,
_ => MeTelemetryLevel::Normal,
}
}
pub fn allows_normal(self) -> bool {
!matches!(self, MeTelemetryLevel::Silent)
}
pub fn allows_debug(self) -> bool {
matches!(self, MeTelemetryLevel::Debug)
}
}
impl std::fmt::Display for MeTelemetryLevel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
MeTelemetryLevel::Silent => write!(f, "silent"),
MeTelemetryLevel::Normal => write!(f, "normal"),
MeTelemetryLevel::Debug => write!(f, "debug"),
}
}
}
/// Middle-End SOCKS KDF fallback policy.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "lowercase")]
pub enum MeSocksKdfPolicy {
#[default]
Strict,
Compat,
}
impl MeSocksKdfPolicy {
pub fn as_u8(self) -> u8 {
match self {
MeSocksKdfPolicy::Strict => 0,
MeSocksKdfPolicy::Compat => 1,
}
}
pub fn from_u8(raw: u8) -> Self {
match raw {
1 => MeSocksKdfPolicy::Compat,
_ => MeSocksKdfPolicy::Strict,
}
}
}
/// Telemetry controls for hot-path counters and ME diagnostics.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TelemetryConfig {
#[serde(default = "default_true")]
pub core_enabled: bool,
#[serde(default = "default_true")]
pub user_enabled: bool,
#[serde(default)]
pub me_level: MeTelemetryLevel,
}
impl Default for TelemetryConfig {
fn default() -> Self {
Self {
core_enabled: default_true(),
user_enabled: default_true(),
me_level: MeTelemetryLevel::Normal,
}
}
}
// ============= Sub-Configs =============
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -288,6 +380,26 @@ pub struct GeneralConfig {
#[serde(default)]
pub disable_colors: bool,
/// Runtime telemetry controls for counters/metrics in hot paths.
#[serde(default)]
pub telemetry: TelemetryConfig,
/// SOCKS-bound KDF policy for Middle-End handshake.
#[serde(default)]
pub me_socks_kdf_policy: MeSocksKdfPolicy,
/// Base backpressure timeout in milliseconds for ME route channel send.
#[serde(default = "default_me_route_backpressure_base_timeout_ms")]
pub me_route_backpressure_base_timeout_ms: u64,
/// High backpressure timeout in milliseconds when queue occupancy is above watermark.
#[serde(default = "default_me_route_backpressure_high_timeout_ms")]
pub me_route_backpressure_high_timeout_ms: u64,
/// Queue occupancy percent threshold for high backpressure timeout.
#[serde(default = "default_me_route_backpressure_high_watermark_pct")]
pub me_route_backpressure_high_watermark_pct: u8,
/// [general.links] — proxy link generation overrides.
#[serde(default)]
pub links: LinksConfig,
@ -414,6 +526,11 @@ impl Default for GeneralConfig {
unknown_dc_log_path: default_unknown_dc_log_path(),
log_level: LogLevel::Normal,
disable_colors: false,
telemetry: TelemetryConfig::default(),
me_socks_kdf_policy: MeSocksKdfPolicy::Strict,
me_route_backpressure_base_timeout_ms: default_me_route_backpressure_base_timeout_ms(),
me_route_backpressure_high_timeout_ms: default_me_route_backpressure_high_timeout_ms(),
me_route_backpressure_high_watermark_pct: default_me_route_backpressure_high_watermark_pct(),
links: LinksConfig::default(),
crypto_pending_buffer: default_crypto_pending_buffer(),
max_client_frame: default_max_client_frame(),

View File

@ -36,6 +36,7 @@ use crate::ip_tracker::UserIpTracker;
use crate::network::probe::{decide_network_capabilities, log_probe_result, run_probe};
use crate::proxy::ClientHandler;
use crate::stats::beobachten::BeobachtenStore;
use crate::stats::telemetry::TelemetryPolicy;
use crate::stats::{ReplayChecker, Stats};
use crate::stream::BufferPool;
use crate::transport::middle_proxy::{
@ -406,6 +407,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let prefer_ipv6 = decision.prefer_ipv6();
let mut use_middle_proxy = config.general.use_middle_proxy && (decision.ipv4_me || decision.ipv6_me);
let stats = Arc::new(Stats::new());
stats.apply_telemetry_policy(TelemetryPolicy::from_config(&config.general.telemetry));
let beobachten = Arc::new(BeobachtenStore::new());
let rng = Arc::new(SecureRandom::new());
@ -539,6 +541,10 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
config.general.me_hardswap_warmup_delay_max_ms,
config.general.me_hardswap_warmup_extra_passes,
config.general.me_hardswap_warmup_pass_backoff_base_ms,
config.general.me_socks_kdf_policy,
config.general.me_route_backpressure_base_timeout_ms,
config.general.me_route_backpressure_high_timeout_ms,
config.general.me_route_backpressure_high_watermark_pct,
);
let pool_size = config.general.middle_proxy_pool_size.max(1);
@ -794,6 +800,27 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
detected_ip_v6,
);
let stats_policy = stats.clone();
let mut config_rx_policy = config_rx.clone();
let me_pool_policy = me_pool.clone();
tokio::spawn(async move {
loop {
if config_rx_policy.changed().await.is_err() {
break;
}
let cfg = config_rx_policy.borrow_and_update().clone();
stats_policy.apply_telemetry_policy(TelemetryPolicy::from_config(&cfg.general.telemetry));
if let Some(pool) = &me_pool_policy {
pool.update_runtime_transport_policy(
cfg.general.me_socks_kdf_policy,
cfg.general.me_route_backpressure_base_timeout_ms,
cfg.general.me_route_backpressure_high_timeout_ms,
cfg.general.me_route_backpressure_high_watermark_pct,
);
}
}
});
let beobachten_writer = beobachten.clone();
let config_rx_beobachten = config_rx.clone();
tokio::spawn(async move {

View File

@ -118,120 +118,394 @@ fn render_beobachten(beobachten: &BeobachtenStore, config: &ProxyConfig) -> Stri
async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIpTracker) -> String {
use std::fmt::Write;
let mut out = String::with_capacity(4096);
let telemetry = stats.telemetry_policy();
let core_enabled = telemetry.core_enabled;
let user_enabled = telemetry.user_enabled;
let me_allows_normal = telemetry.me_level.allows_normal();
let me_allows_debug = telemetry.me_level.allows_debug();
let _ = writeln!(out, "# HELP telemt_uptime_seconds Proxy uptime");
let _ = writeln!(out, "# TYPE telemt_uptime_seconds gauge");
let _ = writeln!(out, "telemt_uptime_seconds {:.1}", stats.uptime_secs());
let _ = writeln!(out, "# HELP telemt_telemetry_core_enabled Runtime core telemetry switch");
let _ = writeln!(out, "# TYPE telemt_telemetry_core_enabled gauge");
let _ = writeln!(
out,
"telemt_telemetry_core_enabled {}",
if core_enabled { 1 } else { 0 }
);
let _ = writeln!(out, "# HELP telemt_telemetry_user_enabled Runtime per-user telemetry switch");
let _ = writeln!(out, "# TYPE telemt_telemetry_user_enabled gauge");
let _ = writeln!(
out,
"telemt_telemetry_user_enabled {}",
if user_enabled { 1 } else { 0 }
);
let _ = writeln!(out, "# HELP telemt_telemetry_me_level Runtime ME telemetry level flag");
let _ = writeln!(out, "# TYPE telemt_telemetry_me_level gauge");
let _ = writeln!(
out,
"telemt_telemetry_me_level{{level=\"silent\"}} {}",
if matches!(telemetry.me_level, crate::config::MeTelemetryLevel::Silent) {
1
} else {
0
}
);
let _ = writeln!(
out,
"telemt_telemetry_me_level{{level=\"normal\"}} {}",
if matches!(telemetry.me_level, crate::config::MeTelemetryLevel::Normal) {
1
} else {
0
}
);
let _ = writeln!(
out,
"telemt_telemetry_me_level{{level=\"debug\"}} {}",
if matches!(telemetry.me_level, crate::config::MeTelemetryLevel::Debug) {
1
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_connections_total Total accepted connections");
let _ = writeln!(out, "# TYPE telemt_connections_total counter");
let _ = writeln!(out, "telemt_connections_total {}", stats.get_connects_all());
let _ = writeln!(
out,
"telemt_connections_total {}",
if core_enabled { stats.get_connects_all() } else { 0 }
);
let _ = writeln!(out, "# HELP telemt_connections_bad_total Bad/rejected connections");
let _ = writeln!(out, "# TYPE telemt_connections_bad_total counter");
let _ = writeln!(out, "telemt_connections_bad_total {}", stats.get_connects_bad());
let _ = writeln!(
out,
"telemt_connections_bad_total {}",
if core_enabled { stats.get_connects_bad() } else { 0 }
);
let _ = writeln!(out, "# HELP telemt_handshake_timeouts_total Handshake timeouts");
let _ = writeln!(out, "# TYPE telemt_handshake_timeouts_total counter");
let _ = writeln!(out, "telemt_handshake_timeouts_total {}", stats.get_handshake_timeouts());
let _ = writeln!(
out,
"telemt_handshake_timeouts_total {}",
if core_enabled {
stats.get_handshake_timeouts()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_me_keepalive_sent_total ME keepalive frames sent");
let _ = writeln!(out, "# TYPE telemt_me_keepalive_sent_total counter");
let _ = writeln!(out, "telemt_me_keepalive_sent_total {}", stats.get_me_keepalive_sent());
let _ = writeln!(
out,
"telemt_me_keepalive_sent_total {}",
if me_allows_debug {
stats.get_me_keepalive_sent()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_me_keepalive_failed_total ME keepalive send failures");
let _ = writeln!(out, "# TYPE telemt_me_keepalive_failed_total counter");
let _ = writeln!(out, "telemt_me_keepalive_failed_total {}", stats.get_me_keepalive_failed());
let _ = writeln!(
out,
"telemt_me_keepalive_failed_total {}",
if me_allows_normal {
stats.get_me_keepalive_failed()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_me_keepalive_pong_total ME keepalive pong replies");
let _ = writeln!(out, "# TYPE telemt_me_keepalive_pong_total counter");
let _ = writeln!(out, "telemt_me_keepalive_pong_total {}", stats.get_me_keepalive_pong());
let _ = writeln!(
out,
"telemt_me_keepalive_pong_total {}",
if me_allows_debug {
stats.get_me_keepalive_pong()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_me_keepalive_timeout_total ME keepalive ping timeouts");
let _ = writeln!(out, "# TYPE telemt_me_keepalive_timeout_total counter");
let _ = writeln!(out, "telemt_me_keepalive_timeout_total {}", stats.get_me_keepalive_timeout());
let _ = writeln!(
out,
"telemt_me_keepalive_timeout_total {}",
if me_allows_normal {
stats.get_me_keepalive_timeout()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_me_reconnect_attempts_total ME reconnect attempts");
let _ = writeln!(out, "# TYPE telemt_me_reconnect_attempts_total counter");
let _ = writeln!(out, "telemt_me_reconnect_attempts_total {}", stats.get_me_reconnect_attempts());
let _ = writeln!(
out,
"telemt_me_reconnect_attempts_total {}",
if me_allows_normal {
stats.get_me_reconnect_attempts()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_me_reconnect_success_total ME reconnect successes");
let _ = writeln!(out, "# TYPE telemt_me_reconnect_success_total counter");
let _ = writeln!(out, "telemt_me_reconnect_success_total {}", stats.get_me_reconnect_success());
let _ = writeln!(
out,
"telemt_me_reconnect_success_total {}",
if me_allows_normal {
stats.get_me_reconnect_success()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_me_crc_mismatch_total ME CRC mismatches");
let _ = writeln!(out, "# TYPE telemt_me_crc_mismatch_total counter");
let _ = writeln!(out, "telemt_me_crc_mismatch_total {}", stats.get_me_crc_mismatch());
let _ = writeln!(
out,
"telemt_me_crc_mismatch_total {}",
if me_allows_normal {
stats.get_me_crc_mismatch()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_me_seq_mismatch_total ME sequence mismatches");
let _ = writeln!(out, "# TYPE telemt_me_seq_mismatch_total counter");
let _ = writeln!(out, "telemt_me_seq_mismatch_total {}", stats.get_me_seq_mismatch());
let _ = writeln!(
out,
"telemt_me_seq_mismatch_total {}",
if me_allows_normal {
stats.get_me_seq_mismatch()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_me_route_drop_no_conn_total ME route drops: no conn");
let _ = writeln!(out, "# TYPE telemt_me_route_drop_no_conn_total counter");
let _ = writeln!(out, "telemt_me_route_drop_no_conn_total {}", stats.get_me_route_drop_no_conn());
let _ = writeln!(
out,
"telemt_me_route_drop_no_conn_total {}",
if me_allows_normal {
stats.get_me_route_drop_no_conn()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_me_route_drop_channel_closed_total ME route drops: channel closed");
let _ = writeln!(out, "# TYPE telemt_me_route_drop_channel_closed_total counter");
let _ = writeln!(out, "telemt_me_route_drop_channel_closed_total {}", stats.get_me_route_drop_channel_closed());
let _ = writeln!(
out,
"telemt_me_route_drop_channel_closed_total {}",
if me_allows_normal {
stats.get_me_route_drop_channel_closed()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_me_route_drop_queue_full_total ME route drops: queue full");
let _ = writeln!(out, "# TYPE telemt_me_route_drop_queue_full_total counter");
let _ = writeln!(out, "telemt_me_route_drop_queue_full_total {}", stats.get_me_route_drop_queue_full());
let _ = writeln!(
out,
"telemt_me_route_drop_queue_full_total {}",
if me_allows_normal {
stats.get_me_route_drop_queue_full()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_route_drop_queue_full_profile_total ME route drops: queue full by adaptive profile"
);
let _ = writeln!(
out,
"# TYPE telemt_me_route_drop_queue_full_profile_total counter"
);
let _ = writeln!(
out,
"telemt_me_route_drop_queue_full_profile_total{{profile=\"base\"}} {}",
if me_allows_normal {
stats.get_me_route_drop_queue_full_base()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_route_drop_queue_full_profile_total{{profile=\"high\"}} {}",
if me_allows_normal {
stats.get_me_route_drop_queue_full_high()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_socks_kdf_policy_total SOCKS KDF policy outcomes"
);
let _ = writeln!(out, "# TYPE telemt_me_socks_kdf_policy_total counter");
let _ = writeln!(
out,
"telemt_me_socks_kdf_policy_total{{policy=\"strict\",outcome=\"reject\"}} {}",
if me_allows_normal {
stats.get_me_socks_kdf_strict_reject()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_socks_kdf_policy_total{{policy=\"compat\",outcome=\"fallback\"}} {}",
if me_allows_debug {
stats.get_me_socks_kdf_compat_fallback()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_secure_padding_invalid_total Invalid secure frame lengths");
let _ = writeln!(out, "# TYPE telemt_secure_padding_invalid_total counter");
let _ = writeln!(out, "telemt_secure_padding_invalid_total {}", stats.get_secure_padding_invalid());
let _ = writeln!(
out,
"telemt_secure_padding_invalid_total {}",
if me_allows_normal {
stats.get_secure_padding_invalid()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_desync_total Total crypto-desync detections");
let _ = writeln!(out, "# TYPE telemt_desync_total counter");
let _ = writeln!(out, "telemt_desync_total {}", stats.get_desync_total());
let _ = writeln!(
out,
"telemt_desync_total {}",
if me_allows_normal {
stats.get_desync_total()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_desync_full_logged_total Full forensic desync logs emitted");
let _ = writeln!(out, "# TYPE telemt_desync_full_logged_total counter");
let _ = writeln!(out, "telemt_desync_full_logged_total {}", stats.get_desync_full_logged());
let _ = writeln!(
out,
"telemt_desync_full_logged_total {}",
if me_allows_normal {
stats.get_desync_full_logged()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_desync_suppressed_total Suppressed desync forensic events");
let _ = writeln!(out, "# TYPE telemt_desync_suppressed_total counter");
let _ = writeln!(out, "telemt_desync_suppressed_total {}", stats.get_desync_suppressed());
let _ = writeln!(
out,
"telemt_desync_suppressed_total {}",
if me_allows_normal {
stats.get_desync_suppressed()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_desync_frames_bucket_total Desync count by frames_ok bucket");
let _ = writeln!(out, "# TYPE telemt_desync_frames_bucket_total counter");
let _ = writeln!(
out,
"telemt_desync_frames_bucket_total{{bucket=\"0\"}} {}",
stats.get_desync_frames_bucket_0()
if me_allows_normal {
stats.get_desync_frames_bucket_0()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_desync_frames_bucket_total{{bucket=\"1_2\"}} {}",
stats.get_desync_frames_bucket_1_2()
if me_allows_normal {
stats.get_desync_frames_bucket_1_2()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_desync_frames_bucket_total{{bucket=\"3_10\"}} {}",
stats.get_desync_frames_bucket_3_10()
if me_allows_normal {
stats.get_desync_frames_bucket_3_10()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_desync_frames_bucket_total{{bucket=\"gt_10\"}} {}",
stats.get_desync_frames_bucket_gt_10()
if me_allows_normal {
stats.get_desync_frames_bucket_gt_10()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_pool_swap_total Successful ME pool swaps");
let _ = writeln!(out, "# TYPE telemt_pool_swap_total counter");
let _ = writeln!(out, "telemt_pool_swap_total {}", stats.get_pool_swap_total());
let _ = writeln!(
out,
"telemt_pool_swap_total {}",
if me_allows_debug {
stats.get_pool_swap_total()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_pool_drain_active Active draining ME writers");
let _ = writeln!(out, "# TYPE telemt_pool_drain_active gauge");
let _ = writeln!(out, "telemt_pool_drain_active {}", stats.get_pool_drain_active());
let _ = writeln!(
out,
"telemt_pool_drain_active {}",
if me_allows_debug {
stats.get_pool_drain_active()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_pool_force_close_total Forced close events for draining writers");
let _ = writeln!(out, "# TYPE telemt_pool_force_close_total counter");
let _ = writeln!(
out,
"telemt_pool_force_close_total {}",
stats.get_pool_force_close_total()
if me_allows_normal {
stats.get_pool_force_close_total()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_pool_stale_pick_total Stale writer fallback picks for new binds");
@ -239,7 +513,11 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
let _ = writeln!(
out,
"telemt_pool_stale_pick_total {}",
stats.get_pool_stale_pick_total()
if me_allows_normal {
stats.get_pool_stale_pick_total()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_me_writer_removed_total Total ME writer removals");
@ -247,7 +525,11 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
let _ = writeln!(
out,
"telemt_me_writer_removed_total {}",
stats.get_me_writer_removed_total()
if me_allows_debug {
stats.get_me_writer_removed_total()
} else {
0
}
);
let _ = writeln!(
@ -258,7 +540,11 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
let _ = writeln!(
out,
"telemt_me_writer_removed_unexpected_total {}",
stats.get_me_writer_removed_unexpected_total()
if me_allows_normal {
stats.get_me_writer_removed_unexpected_total()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_me_refill_triggered_total Immediate ME refill runs started");
@ -266,7 +552,11 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
let _ = writeln!(
out,
"telemt_me_refill_triggered_total {}",
stats.get_me_refill_triggered_total()
if me_allows_debug {
stats.get_me_refill_triggered_total()
} else {
0
}
);
let _ = writeln!(
@ -277,7 +567,11 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
let _ = writeln!(
out,
"telemt_me_refill_skipped_inflight_total {}",
stats.get_me_refill_skipped_inflight_total()
if me_allows_debug {
stats.get_me_refill_skipped_inflight_total()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_me_refill_failed_total Immediate ME refill failures");
@ -285,7 +579,11 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
let _ = writeln!(
out,
"telemt_me_refill_failed_total {}",
stats.get_me_refill_failed_total()
if me_allows_normal {
stats.get_me_refill_failed_total()
} else {
0
}
);
let _ = writeln!(
@ -296,7 +594,11 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
let _ = writeln!(
out,
"telemt_me_writer_restored_same_endpoint_total {}",
stats.get_me_writer_restored_same_endpoint_total()
if me_allows_normal {
stats.get_me_writer_restored_same_endpoint_total()
} else {
0
}
);
let _ = writeln!(
@ -307,16 +609,24 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
let _ = writeln!(
out,
"telemt_me_writer_restored_fallback_total {}",
stats.get_me_writer_restored_fallback_total()
if me_allows_normal {
stats.get_me_writer_restored_fallback_total()
} else {
0
}
);
let unresolved_writer_losses = stats
.get_me_writer_removed_unexpected_total()
.saturating_sub(
stats
.get_me_writer_restored_same_endpoint_total()
.saturating_add(stats.get_me_writer_restored_fallback_total()),
);
let unresolved_writer_losses = if me_allows_normal {
stats
.get_me_writer_removed_unexpected_total()
.saturating_sub(
stats
.get_me_writer_restored_same_endpoint_total()
.saturating_add(stats.get_me_writer_restored_fallback_total()),
)
} else {
0
};
let _ = writeln!(
out,
"# HELP telemt_me_writer_removed_unexpected_minus_restored_total Unexpected writer removals not yet compensated by restore"
@ -343,51 +653,63 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
let _ = writeln!(out, "# TYPE telemt_user_msgs_from_client counter");
let _ = writeln!(out, "# HELP telemt_user_msgs_to_client Per-user messages sent");
let _ = writeln!(out, "# TYPE telemt_user_msgs_to_client counter");
let _ = writeln!(
out,
"# HELP telemt_telemetry_user_series_suppressed User-labeled metric series suppression flag"
);
let _ = writeln!(out, "# TYPE telemt_telemetry_user_series_suppressed gauge");
let _ = writeln!(
out,
"telemt_telemetry_user_series_suppressed {}",
if user_enabled { 0 } else { 1 }
);
for entry in stats.iter_user_stats() {
let user = entry.key();
let s = entry.value();
let _ = writeln!(out, "telemt_user_connections_total{{user=\"{}\"}} {}", user, s.connects.load(std::sync::atomic::Ordering::Relaxed));
let _ = writeln!(out, "telemt_user_connections_current{{user=\"{}\"}} {}", user, s.curr_connects.load(std::sync::atomic::Ordering::Relaxed));
let _ = writeln!(out, "telemt_user_octets_from_client{{user=\"{}\"}} {}", user, s.octets_from_client.load(std::sync::atomic::Ordering::Relaxed));
let _ = writeln!(out, "telemt_user_octets_to_client{{user=\"{}\"}} {}", user, s.octets_to_client.load(std::sync::atomic::Ordering::Relaxed));
let _ = writeln!(out, "telemt_user_msgs_from_client{{user=\"{}\"}} {}", user, s.msgs_from_client.load(std::sync::atomic::Ordering::Relaxed));
let _ = writeln!(out, "telemt_user_msgs_to_client{{user=\"{}\"}} {}", user, s.msgs_to_client.load(std::sync::atomic::Ordering::Relaxed));
}
if user_enabled {
for entry in stats.iter_user_stats() {
let user = entry.key();
let s = entry.value();
let _ = writeln!(out, "telemt_user_connections_total{{user=\"{}\"}} {}", user, s.connects.load(std::sync::atomic::Ordering::Relaxed));
let _ = writeln!(out, "telemt_user_connections_current{{user=\"{}\"}} {}", user, s.curr_connects.load(std::sync::atomic::Ordering::Relaxed));
let _ = writeln!(out, "telemt_user_octets_from_client{{user=\"{}\"}} {}", user, s.octets_from_client.load(std::sync::atomic::Ordering::Relaxed));
let _ = writeln!(out, "telemt_user_octets_to_client{{user=\"{}\"}} {}", user, s.octets_to_client.load(std::sync::atomic::Ordering::Relaxed));
let _ = writeln!(out, "telemt_user_msgs_from_client{{user=\"{}\"}} {}", user, s.msgs_from_client.load(std::sync::atomic::Ordering::Relaxed));
let _ = writeln!(out, "telemt_user_msgs_to_client{{user=\"{}\"}} {}", user, s.msgs_to_client.load(std::sync::atomic::Ordering::Relaxed));
}
let ip_stats = ip_tracker.get_stats().await;
let ip_counts: HashMap<String, usize> = ip_stats
.into_iter()
.map(|(user, count, _)| (user, count))
.collect();
let ip_stats = ip_tracker.get_stats().await;
let ip_counts: HashMap<String, usize> = ip_stats
.into_iter()
.map(|(user, count, _)| (user, count))
.collect();
let mut unique_users = BTreeSet::new();
unique_users.extend(config.access.user_max_unique_ips.keys().cloned());
unique_users.extend(ip_counts.keys().cloned());
let mut unique_users = BTreeSet::new();
unique_users.extend(config.access.user_max_unique_ips.keys().cloned());
unique_users.extend(ip_counts.keys().cloned());
let _ = writeln!(out, "# HELP telemt_user_unique_ips_current Per-user current number of unique active IPs");
let _ = writeln!(out, "# TYPE telemt_user_unique_ips_current gauge");
let _ = writeln!(out, "# HELP telemt_user_unique_ips_limit Per-user configured unique IP limit (0 means unlimited)");
let _ = writeln!(out, "# TYPE telemt_user_unique_ips_limit gauge");
let _ = writeln!(out, "# HELP telemt_user_unique_ips_utilization Per-user unique IP usage ratio (0 for unlimited)");
let _ = writeln!(out, "# TYPE telemt_user_unique_ips_utilization gauge");
let _ = writeln!(out, "# HELP telemt_user_unique_ips_current Per-user current number of unique active IPs");
let _ = writeln!(out, "# TYPE telemt_user_unique_ips_current gauge");
let _ = writeln!(out, "# HELP telemt_user_unique_ips_limit Per-user configured unique IP limit (0 means unlimited)");
let _ = writeln!(out, "# TYPE telemt_user_unique_ips_limit gauge");
let _ = writeln!(out, "# HELP telemt_user_unique_ips_utilization Per-user unique IP usage ratio (0 for unlimited)");
let _ = writeln!(out, "# TYPE telemt_user_unique_ips_utilization gauge");
for user in unique_users {
let current = ip_counts.get(&user).copied().unwrap_or(0);
let limit = config.access.user_max_unique_ips.get(&user).copied().unwrap_or(0);
let utilization = if limit > 0 {
current as f64 / limit as f64
} else {
0.0
};
let _ = writeln!(out, "telemt_user_unique_ips_current{{user=\"{}\"}} {}", user, current);
let _ = writeln!(out, "telemt_user_unique_ips_limit{{user=\"{}\"}} {}", user, limit);
let _ = writeln!(
out,
"telemt_user_unique_ips_utilization{{user=\"{}\"}} {:.6}",
user,
utilization
);
for user in unique_users {
let current = ip_counts.get(&user).copied().unwrap_or(0);
let limit = config.access.user_max_unique_ips.get(&user).copied().unwrap_or(0);
let utilization = if limit > 0 {
current as f64 / limit as f64
} else {
0.0
};
let _ = writeln!(out, "telemt_user_unique_ips_current{{user=\"{}\"}} {}", user, current);
let _ = writeln!(out, "telemt_user_unique_ips_limit{{user=\"{}\"}} {}", user, limit);
let _ = writeln!(
out,
"telemt_user_unique_ips_utilization{{user=\"{}\"}} {:.6}",
user,
utilization
);
}
}
out

View File

@ -3,8 +3,9 @@
#![allow(dead_code)]
pub mod beobachten;
pub mod telemetry;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering};
use std::time::{Instant, Duration};
use dashmap::DashMap;
use parking_lot::Mutex;
@ -15,6 +16,9 @@ use std::collections::hash_map::DefaultHasher;
use std::collections::VecDeque;
use tracing::debug;
use crate::config::MeTelemetryLevel;
use self::telemetry::TelemetryPolicy;
// ============= Stats =============
#[derive(Default)]
@ -33,6 +37,10 @@ pub struct Stats {
me_route_drop_no_conn: AtomicU64,
me_route_drop_channel_closed: AtomicU64,
me_route_drop_queue_full: AtomicU64,
me_route_drop_queue_full_base: AtomicU64,
me_route_drop_queue_full_high: AtomicU64,
me_socks_kdf_strict_reject: AtomicU64,
me_socks_kdf_compat_fallback: AtomicU64,
secure_padding_invalid: AtomicU64,
desync_total: AtomicU64,
desync_full_logged: AtomicU64,
@ -52,6 +60,9 @@ pub struct Stats {
me_refill_failed_total: AtomicU64,
me_writer_restored_same_endpoint_total: AtomicU64,
me_writer_restored_fallback_total: AtomicU64,
telemetry_core_enabled: AtomicBool,
telemetry_user_enabled: AtomicBool,
telemetry_me_level: AtomicU8,
user_stats: DashMap<String, UserStats>,
start_time: parking_lot::RwLock<Option<Instant>>,
}
@ -69,44 +80,167 @@ pub struct UserStats {
impl Stats {
pub fn new() -> Self {
let stats = Self::default();
stats.apply_telemetry_policy(TelemetryPolicy::default());
*stats.start_time.write() = Some(Instant::now());
stats
}
pub fn increment_connects_all(&self) { self.connects_all.fetch_add(1, Ordering::Relaxed); }
pub fn increment_connects_bad(&self) { self.connects_bad.fetch_add(1, Ordering::Relaxed); }
pub fn increment_handshake_timeouts(&self) { self.handshake_timeouts.fetch_add(1, Ordering::Relaxed); }
pub fn increment_me_keepalive_sent(&self) { self.me_keepalive_sent.fetch_add(1, Ordering::Relaxed); }
pub fn increment_me_keepalive_failed(&self) { self.me_keepalive_failed.fetch_add(1, Ordering::Relaxed); }
pub fn increment_me_keepalive_pong(&self) { self.me_keepalive_pong.fetch_add(1, Ordering::Relaxed); }
pub fn increment_me_keepalive_timeout(&self) { self.me_keepalive_timeout.fetch_add(1, Ordering::Relaxed); }
pub fn increment_me_keepalive_timeout_by(&self, value: u64) {
self.me_keepalive_timeout.fetch_add(value, Ordering::Relaxed);
fn telemetry_me_level(&self) -> MeTelemetryLevel {
MeTelemetryLevel::from_u8(self.telemetry_me_level.load(Ordering::Relaxed))
}
fn telemetry_core_enabled(&self) -> bool {
self.telemetry_core_enabled.load(Ordering::Relaxed)
}
fn telemetry_user_enabled(&self) -> bool {
self.telemetry_user_enabled.load(Ordering::Relaxed)
}
fn telemetry_me_allows_normal(&self) -> bool {
self.telemetry_me_level().allows_normal()
}
fn telemetry_me_allows_debug(&self) -> bool {
self.telemetry_me_level().allows_debug()
}
pub fn apply_telemetry_policy(&self, policy: TelemetryPolicy) {
self.telemetry_core_enabled
.store(policy.core_enabled, Ordering::Relaxed);
self.telemetry_user_enabled
.store(policy.user_enabled, Ordering::Relaxed);
self.telemetry_me_level
.store(policy.me_level.as_u8(), Ordering::Relaxed);
}
pub fn telemetry_policy(&self) -> TelemetryPolicy {
TelemetryPolicy {
core_enabled: self.telemetry_core_enabled(),
user_enabled: self.telemetry_user_enabled(),
me_level: self.telemetry_me_level(),
}
}
pub fn increment_connects_all(&self) {
if self.telemetry_core_enabled() {
self.connects_all.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_connects_bad(&self) {
if self.telemetry_core_enabled() {
self.connects_bad.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_handshake_timeouts(&self) {
if self.telemetry_core_enabled() {
self.handshake_timeouts.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_keepalive_sent(&self) {
if self.telemetry_me_allows_debug() {
self.me_keepalive_sent.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_keepalive_failed(&self) {
if self.telemetry_me_allows_normal() {
self.me_keepalive_failed.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_keepalive_pong(&self) {
if self.telemetry_me_allows_debug() {
self.me_keepalive_pong.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_keepalive_timeout(&self) {
if self.telemetry_me_allows_normal() {
self.me_keepalive_timeout.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_keepalive_timeout_by(&self, value: u64) {
if self.telemetry_me_allows_normal() {
self.me_keepalive_timeout.fetch_add(value, Ordering::Relaxed);
}
}
pub fn increment_me_reconnect_attempt(&self) {
if self.telemetry_me_allows_normal() {
self.me_reconnect_attempts.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_reconnect_success(&self) {
if self.telemetry_me_allows_normal() {
self.me_reconnect_success.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_crc_mismatch(&self) {
if self.telemetry_me_allows_normal() {
self.me_crc_mismatch.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_seq_mismatch(&self) {
if self.telemetry_me_allows_normal() {
self.me_seq_mismatch.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_route_drop_no_conn(&self) {
if self.telemetry_me_allows_normal() {
self.me_route_drop_no_conn.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_reconnect_attempt(&self) { self.me_reconnect_attempts.fetch_add(1, Ordering::Relaxed); }
pub fn increment_me_reconnect_success(&self) { self.me_reconnect_success.fetch_add(1, Ordering::Relaxed); }
pub fn increment_me_crc_mismatch(&self) { self.me_crc_mismatch.fetch_add(1, Ordering::Relaxed); }
pub fn increment_me_seq_mismatch(&self) { self.me_seq_mismatch.fetch_add(1, Ordering::Relaxed); }
pub fn increment_me_route_drop_no_conn(&self) { self.me_route_drop_no_conn.fetch_add(1, Ordering::Relaxed); }
pub fn increment_me_route_drop_channel_closed(&self) {
self.me_route_drop_channel_closed.fetch_add(1, Ordering::Relaxed);
if self.telemetry_me_allows_normal() {
self.me_route_drop_channel_closed.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_route_drop_queue_full(&self) {
self.me_route_drop_queue_full.fetch_add(1, Ordering::Relaxed);
if self.telemetry_me_allows_normal() {
self.me_route_drop_queue_full.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_route_drop_queue_full_base(&self) {
if self.telemetry_me_allows_normal() {
self.me_route_drop_queue_full_base.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_route_drop_queue_full_high(&self) {
if self.telemetry_me_allows_normal() {
self.me_route_drop_queue_full_high.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_socks_kdf_strict_reject(&self) {
if self.telemetry_me_allows_normal() {
self.me_socks_kdf_strict_reject.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_socks_kdf_compat_fallback(&self) {
if self.telemetry_me_allows_debug() {
self.me_socks_kdf_compat_fallback.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_secure_padding_invalid(&self) {
self.secure_padding_invalid.fetch_add(1, Ordering::Relaxed);
if self.telemetry_me_allows_normal() {
self.secure_padding_invalid.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_desync_total(&self) {
self.desync_total.fetch_add(1, Ordering::Relaxed);
if self.telemetry_me_allows_normal() {
self.desync_total.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_desync_full_logged(&self) {
self.desync_full_logged.fetch_add(1, Ordering::Relaxed);
if self.telemetry_me_allows_normal() {
self.desync_full_logged.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_desync_suppressed(&self) {
self.desync_suppressed.fetch_add(1, Ordering::Relaxed);
if self.telemetry_me_allows_normal() {
self.desync_suppressed.fetch_add(1, Ordering::Relaxed);
}
}
pub fn observe_desync_frames_ok(&self, frames_ok: u64) {
if !self.telemetry_me_allows_normal() {
return;
}
match frames_ok {
0 => {
self.desync_frames_bucket_0.fetch_add(1, Ordering::Relaxed);
@ -123,12 +257,19 @@ impl Stats {
}
}
pub fn increment_pool_swap_total(&self) {
self.pool_swap_total.fetch_add(1, Ordering::Relaxed);
if self.telemetry_me_allows_debug() {
self.pool_swap_total.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_pool_drain_active(&self) {
self.pool_drain_active.fetch_add(1, Ordering::Relaxed);
if self.telemetry_me_allows_debug() {
self.pool_drain_active.fetch_add(1, Ordering::Relaxed);
}
}
pub fn decrement_pool_drain_active(&self) {
if !self.telemetry_me_allows_debug() {
return;
}
let mut current = self.pool_drain_active.load(Ordering::Relaxed);
loop {
if current == 0 {
@ -146,31 +287,51 @@ impl Stats {
}
}
pub fn increment_pool_force_close_total(&self) {
self.pool_force_close_total.fetch_add(1, Ordering::Relaxed);
if self.telemetry_me_allows_normal() {
self.pool_force_close_total.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_pool_stale_pick_total(&self) {
self.pool_stale_pick_total.fetch_add(1, Ordering::Relaxed);
if self.telemetry_me_allows_normal() {
self.pool_stale_pick_total.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_writer_removed_total(&self) {
self.me_writer_removed_total.fetch_add(1, Ordering::Relaxed);
if self.telemetry_me_allows_debug() {
self.me_writer_removed_total.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_writer_removed_unexpected_total(&self) {
self.me_writer_removed_unexpected_total.fetch_add(1, Ordering::Relaxed);
if self.telemetry_me_allows_normal() {
self.me_writer_removed_unexpected_total.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_refill_triggered_total(&self) {
self.me_refill_triggered_total.fetch_add(1, Ordering::Relaxed);
if self.telemetry_me_allows_debug() {
self.me_refill_triggered_total.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_refill_skipped_inflight_total(&self) {
self.me_refill_skipped_inflight_total.fetch_add(1, Ordering::Relaxed);
if self.telemetry_me_allows_debug() {
self.me_refill_skipped_inflight_total.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_refill_failed_total(&self) {
self.me_refill_failed_total.fetch_add(1, Ordering::Relaxed);
if self.telemetry_me_allows_normal() {
self.me_refill_failed_total.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_writer_restored_same_endpoint_total(&self) {
self.me_writer_restored_same_endpoint_total.fetch_add(1, Ordering::Relaxed);
if self.telemetry_me_allows_normal() {
self.me_writer_restored_same_endpoint_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_writer_restored_fallback_total(&self) {
self.me_writer_restored_fallback_total.fetch_add(1, Ordering::Relaxed);
if self.telemetry_me_allows_normal() {
self.me_writer_restored_fallback_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) }
pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) }
@ -189,6 +350,18 @@ impl Stats {
pub fn get_me_route_drop_queue_full(&self) -> u64 {
self.me_route_drop_queue_full.load(Ordering::Relaxed)
}
pub fn get_me_route_drop_queue_full_base(&self) -> u64 {
self.me_route_drop_queue_full_base.load(Ordering::Relaxed)
}
pub fn get_me_route_drop_queue_full_high(&self) -> u64 {
self.me_route_drop_queue_full_high.load(Ordering::Relaxed)
}
pub fn get_me_socks_kdf_strict_reject(&self) -> u64 {
self.me_socks_kdf_strict_reject.load(Ordering::Relaxed)
}
pub fn get_me_socks_kdf_compat_fallback(&self) -> u64 {
self.me_socks_kdf_compat_fallback.load(Ordering::Relaxed)
}
pub fn get_secure_padding_invalid(&self) -> u64 {
self.secure_padding_invalid.load(Ordering::Relaxed)
}
@ -248,11 +421,17 @@ impl Stats {
}
pub fn increment_user_connects(&self, user: &str) {
if !self.telemetry_user_enabled() {
return;
}
self.user_stats.entry(user.to_string()).or_default()
.connects.fetch_add(1, Ordering::Relaxed);
}
pub fn increment_user_curr_connects(&self, user: &str) {
if !self.telemetry_user_enabled() {
return;
}
self.user_stats.entry(user.to_string()).or_default()
.curr_connects.fetch_add(1, Ordering::Relaxed);
}
@ -285,21 +464,33 @@ impl Stats {
}
pub fn add_user_octets_from(&self, user: &str, bytes: u64) {
if !self.telemetry_user_enabled() {
return;
}
self.user_stats.entry(user.to_string()).or_default()
.octets_from_client.fetch_add(bytes, Ordering::Relaxed);
}
pub fn add_user_octets_to(&self, user: &str, bytes: u64) {
if !self.telemetry_user_enabled() {
return;
}
self.user_stats.entry(user.to_string()).or_default()
.octets_to_client.fetch_add(bytes, Ordering::Relaxed);
}
pub fn increment_user_msgs_from(&self, user: &str) {
if !self.telemetry_user_enabled() {
return;
}
self.user_stats.entry(user.to_string()).or_default()
.msgs_from_client.fetch_add(1, Ordering::Relaxed);
}
pub fn increment_user_msgs_to(&self, user: &str) {
if !self.telemetry_user_enabled() {
return;
}
self.user_stats.entry(user.to_string()).or_default()
.msgs_to_client.fetch_add(1, Ordering::Relaxed);
}
@ -548,6 +739,7 @@ impl ReplayStats {
#[cfg(test)]
mod tests {
use super::*;
use crate::config::MeTelemetryLevel;
use std::sync::Arc;
#[test]
@ -558,6 +750,40 @@ mod tests {
stats.increment_connects_all();
assert_eq!(stats.get_connects_all(), 3);
}
#[test]
fn test_telemetry_policy_disables_core_and_user_counters() {
let stats = Stats::new();
stats.apply_telemetry_policy(TelemetryPolicy {
core_enabled: false,
user_enabled: false,
me_level: MeTelemetryLevel::Normal,
});
stats.increment_connects_all();
stats.increment_user_connects("alice");
stats.add_user_octets_from("alice", 1024);
assert_eq!(stats.get_connects_all(), 0);
assert_eq!(stats.get_user_curr_connects("alice"), 0);
assert_eq!(stats.get_user_total_octets("alice"), 0);
}
#[test]
fn test_telemetry_policy_me_silent_blocks_me_counters() {
let stats = Stats::new();
stats.apply_telemetry_policy(TelemetryPolicy {
core_enabled: true,
user_enabled: true,
me_level: MeTelemetryLevel::Silent,
});
stats.increment_me_crc_mismatch();
stats.increment_me_keepalive_sent();
stats.increment_me_route_drop_queue_full();
assert_eq!(stats.get_me_crc_mismatch(), 0);
assert_eq!(stats.get_me_keepalive_sent(), 0);
assert_eq!(stats.get_me_route_drop_queue_full(), 0);
}
#[test]
fn test_replay_checker_basic() {

29
src/stats/telemetry.rs Normal file
View File

@ -0,0 +1,29 @@
use crate::config::{MeTelemetryLevel, TelemetryConfig};
/// Runtime telemetry policy used by hot-path counters.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct TelemetryPolicy {
pub core_enabled: bool,
pub user_enabled: bool,
pub me_level: MeTelemetryLevel,
}
impl Default for TelemetryPolicy {
fn default() -> Self {
Self {
core_enabled: true,
user_enabled: true,
me_level: MeTelemetryLevel::Normal,
}
}
}
impl TelemetryPolicy {
pub fn from_config(cfg: &TelemetryConfig) -> Self {
Self {
core_enabled: cfg.core_enabled,
user_enabled: cfg.user_enabled,
me_level: cfg.me_level,
}
}
}

View File

@ -14,6 +14,7 @@ use tokio::net::{TcpStream, TcpSocket};
use tokio::time::timeout;
use tracing::{debug, info, warn};
use crate::config::MeSocksKdfPolicy;
use crate::crypto::{SecureRandom, build_middleproxy_prekey, derive_middleproxy_keys, sha256};
use crate::error::{ProxyError, Result};
use crate::network::IpFamily;
@ -117,6 +118,13 @@ impl MePool {
Some(bound)
}
fn is_socks_route(upstream_egress: Option<UpstreamEgressInfo>) -> bool {
matches!(
upstream_egress.map(|info| info.route_kind),
Some(UpstreamRouteKind::Socks4 | UpstreamRouteKind::Socks5)
)
}
/// TCP connect with timeout + return RTT in milliseconds.
pub(crate) async fn connect_tcp(
&self,
@ -125,14 +133,7 @@ impl MePool {
let start = Instant::now();
let (stream, upstream_egress) = if let Some(upstream) = &self.upstream {
let dc_idx = self.resolve_dc_idx_for_endpoint(addr).await;
let (stream, egress) = timeout(
Duration::from_secs(ME_CONNECT_TIMEOUT_SECS),
upstream.connect_with_details(addr, dc_idx, None),
)
.await
.map_err(|_| ProxyError::ConnectionTimeout {
addr: addr.to_string(),
})??;
let (stream, egress) = upstream.connect_with_details(addr, dc_idx, None).await?;
(stream, Some(egress))
} else {
let connect_fut = async {
@ -226,9 +227,29 @@ impl MePool {
} else {
IpFamily::V6
};
let is_socks_route = Self::is_socks_route(upstream_egress);
let socks_bound_addr = Self::select_socks_bound_addr(family, upstream_egress);
let reflected = if let Some(bound) = socks_bound_addr {
Some(bound)
} else if is_socks_route {
match self.socks_kdf_policy() {
MeSocksKdfPolicy::Strict => {
self.stats.increment_me_socks_kdf_strict_reject();
return Err(ProxyError::InvalidHandshake(
"SOCKS route returned no valid BND.ADDR for ME KDF (strict policy)"
.to_string(),
));
}
MeSocksKdfPolicy::Compat => {
self.stats.increment_me_socks_kdf_compat_fallback();
if self.nat_probe {
let bind_ip = Self::direct_bind_ip_for_stun(family, upstream_egress);
self.maybe_reflect_public_addr(family, bind_ip).await
} else {
None
}
}
}
} else if self.nat_probe {
let bind_ip = Self::direct_bind_ip_for_stun(family, upstream_egress);
self.maybe_reflect_public_addr(family, bind_ip).await

View File

@ -7,6 +7,7 @@ use tokio::net::UdpSocket;
use crate::config::{UpstreamConfig, UpstreamType};
use crate::crypto::SecureRandom;
use crate::error::ProxyError;
use crate::transport::{UpstreamEgressInfo, UpstreamRouteKind};
use super::MePool;
@ -20,6 +21,7 @@ pub enum MePingFamily {
pub struct MePingSample {
pub dc: i32,
pub addr: SocketAddr,
pub route: Option<String>,
pub connect_ms: Option<f64>,
pub handshake_ms: Option<f64>,
pub error: Option<String>,
@ -84,6 +86,34 @@ fn pick_target_for_family(reports: &[MePingReport], family: MePingFamily) -> Opt
})
}
fn route_from_egress(egress: Option<UpstreamEgressInfo>) -> Option<String> {
let info = egress?;
match info.route_kind {
UpstreamRouteKind::Direct => {
let src_ip = info
.direct_bind_ip
.or_else(|| info.local_addr.map(|addr| addr.ip()));
let ip = src_ip?;
let mut parts = Vec::new();
if let Some(dev) = detect_interface_for_ip(ip) {
parts.push(format!("dev={dev}"));
}
parts.push(format!("src={ip}"));
Some(format!("direct {}", parts.join(" ")))
}
UpstreamRouteKind::Socks4 => Some(
info.socks_bound_addr
.map(|addr| format!("socks4 bnd={addr}"))
.unwrap_or_else(|| "socks4".to_string()),
),
UpstreamRouteKind::Socks5 => Some(
info.socks_bound_addr
.map(|addr| format!("socks5 bnd={addr}"))
.unwrap_or_else(|| "socks5".to_string()),
),
}
}
#[cfg(unix)]
fn detect_interface_for_ip(ip: IpAddr) -> Option<String> {
use nix::ifaddrs::getifaddrs;
@ -160,6 +190,15 @@ pub async fn format_me_route(
v4_ok: bool,
v6_ok: bool,
) -> String {
if let Some(route) = reports
.iter()
.flat_map(|report| report.samples.iter())
.find(|sample| sample.error.is_none() && sample.handshake_ms.is_some())
.and_then(|sample| sample.route.clone())
{
return route;
}
let enabled_upstreams: Vec<_> = upstreams.iter().filter(|u| u.enabled).collect();
if enabled_upstreams.is_empty() {
return detect_direct_route_details(reports, prefer_ipv6, v4_ok, v6_ok)
@ -222,6 +261,7 @@ mod tests {
let s = sample(MePingSample {
dc: 4,
addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)), 8888),
route: Some("direct src=1.2.3.4".to_string()),
connect_ms: Some(12.3),
handshake_ms: Some(34.7),
error: None,
@ -238,6 +278,7 @@ mod tests {
let s = sample(MePingSample {
dc: -5,
addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(5, 6, 7, 8)), 80),
route: Some("socks5".to_string()),
connect_ms: Some(10.0),
handshake_ms: None,
error: Some("handshake timeout".to_string()),
@ -278,10 +319,12 @@ pub async fn run_me_ping(pool: &Arc<MePool>, rng: &SecureRandom) -> Vec<MePingRe
let mut connect_ms = None;
let mut handshake_ms = None;
let mut error = None;
let mut route = None;
match pool.connect_tcp(addr).await {
Ok((stream, conn_rtt, upstream_egress)) => {
connect_ms = Some(conn_rtt);
route = route_from_egress(upstream_egress);
match pool.handshake_only(stream, addr, upstream_egress, rng).await {
Ok(hs) => {
handshake_ms = Some(hs.handshake_ms);
@ -302,6 +345,7 @@ pub async fn run_me_ping(pool: &Arc<MePool>, rng: &SecureRandom) -> Vec<MePingRe
samples.push(MePingSample {
dc,
addr,
route,
connect_ms,
handshake_ms,
error,

View File

@ -1,12 +1,13 @@
use std::collections::{HashMap, HashSet};
use std::net::{IpAddr, Ipv6Addr, SocketAddr};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU32, AtomicU64, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU8, AtomicU32, AtomicU64, AtomicUsize, Ordering};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::sync::{Mutex, Notify, RwLock, mpsc};
use tokio_util::sync::CancellationToken;
use crate::config::MeSocksKdfPolicy;
use crate::crypto::SecureRandom;
use crate::network::IpFamily;
use crate::network::probe::NetworkDecision;
@ -82,6 +83,7 @@ pub struct MePool {
pub(super) me_hardswap_warmup_delay_max_ms: AtomicU64,
pub(super) me_hardswap_warmup_extra_passes: AtomicU32,
pub(super) me_hardswap_warmup_pass_backoff_base_ms: AtomicU64,
pub(super) me_socks_kdf_policy: AtomicU8,
pool_size: usize,
}
@ -145,9 +147,19 @@ impl MePool {
me_hardswap_warmup_delay_max_ms: u64,
me_hardswap_warmup_extra_passes: u8,
me_hardswap_warmup_pass_backoff_base_ms: u64,
me_socks_kdf_policy: MeSocksKdfPolicy,
me_route_backpressure_base_timeout_ms: u64,
me_route_backpressure_high_timeout_ms: u64,
me_route_backpressure_high_watermark_pct: u8,
) -> Arc<Self> {
let registry = Arc::new(ConnRegistry::new());
registry.update_route_backpressure_policy(
me_route_backpressure_base_timeout_ms,
me_route_backpressure_high_timeout_ms,
me_route_backpressure_high_watermark_pct,
);
Arc::new(Self {
registry: Arc::new(ConnRegistry::new()),
registry,
writers: Arc::new(RwLock::new(Vec::new())),
rr: AtomicU64::new(0),
decision,
@ -204,6 +216,7 @@ impl MePool {
me_hardswap_warmup_pass_backoff_base_ms: AtomicU64::new(
me_hardswap_warmup_pass_backoff_base_ms,
),
me_socks_kdf_policy: AtomicU8::new(me_socks_kdf_policy.as_u8()),
})
}
@ -260,6 +273,26 @@ impl MePool {
&self.registry
}
pub fn update_runtime_transport_policy(
&self,
socks_kdf_policy: MeSocksKdfPolicy,
route_backpressure_base_timeout_ms: u64,
route_backpressure_high_timeout_ms: u64,
route_backpressure_high_watermark_pct: u8,
) {
self.me_socks_kdf_policy
.store(socks_kdf_policy.as_u8(), Ordering::Relaxed);
self.registry.update_route_backpressure_policy(
route_backpressure_base_timeout_ms,
route_backpressure_high_timeout_ms,
route_backpressure_high_watermark_pct,
);
}
pub(super) fn socks_kdf_policy(&self) -> MeSocksKdfPolicy {
MeSocksKdfPolicy::from_u8(self.me_socks_kdf_policy.load(Ordering::Relaxed))
}
pub(super) fn writers_arc(&self) -> Arc<RwLock<Vec<MeWriter>>> {
self.writers.clone()
}

View File

@ -124,7 +124,14 @@ pub(crate) async fn reader_loop(
match routed {
RouteResult::NoConn => stats.increment_me_route_drop_no_conn(),
RouteResult::ChannelClosed => stats.increment_me_route_drop_channel_closed(),
RouteResult::QueueFull => stats.increment_me_route_drop_queue_full(),
RouteResult::QueueFullBase => {
stats.increment_me_route_drop_queue_full();
stats.increment_me_route_drop_queue_full_base();
}
RouteResult::QueueFullHigh => {
stats.increment_me_route_drop_queue_full();
stats.increment_me_route_drop_queue_full_high();
}
RouteResult::Routed => {}
}
reg.unregister(cid).await;
@ -140,7 +147,14 @@ pub(crate) async fn reader_loop(
match routed {
RouteResult::NoConn => stats.increment_me_route_drop_no_conn(),
RouteResult::ChannelClosed => stats.increment_me_route_drop_channel_closed(),
RouteResult::QueueFull => stats.increment_me_route_drop_queue_full(),
RouteResult::QueueFullBase => {
stats.increment_me_route_drop_queue_full();
stats.increment_me_route_drop_queue_full_base();
}
RouteResult::QueueFullHigh => {
stats.increment_me_route_drop_queue_full();
stats.increment_me_route_drop_queue_full_high();
}
RouteResult::Routed => {}
}
reg.unregister(cid).await;

View File

@ -1,6 +1,6 @@
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::atomic::{AtomicU8, AtomicU64, Ordering};
use std::time::Duration;
use tokio::sync::{mpsc, RwLock};
@ -10,14 +10,17 @@ use super::codec::WriterCommand;
use super::MeResponse;
const ROUTE_CHANNEL_CAPACITY: usize = 4096;
const ROUTE_BACKPRESSURE_TIMEOUT: Duration = Duration::from_millis(25);
const ROUTE_BACKPRESSURE_BASE_TIMEOUT_MS: u64 = 25;
const ROUTE_BACKPRESSURE_HIGH_TIMEOUT_MS: u64 = 120;
const ROUTE_BACKPRESSURE_HIGH_WATERMARK_PCT: u8 = 80;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RouteResult {
Routed,
NoConn,
ChannelClosed,
QueueFull,
QueueFullBase,
QueueFullHigh,
}
#[derive(Clone)]
@ -65,6 +68,9 @@ impl RegistryInner {
pub struct ConnRegistry {
inner: RwLock<RegistryInner>,
next_id: AtomicU64,
route_backpressure_base_timeout_ms: AtomicU64,
route_backpressure_high_timeout_ms: AtomicU64,
route_backpressure_high_watermark_pct: AtomicU8,
}
impl ConnRegistry {
@ -73,9 +79,35 @@ impl ConnRegistry {
Self {
inner: RwLock::new(RegistryInner::new()),
next_id: AtomicU64::new(start),
route_backpressure_base_timeout_ms: AtomicU64::new(
ROUTE_BACKPRESSURE_BASE_TIMEOUT_MS,
),
route_backpressure_high_timeout_ms: AtomicU64::new(
ROUTE_BACKPRESSURE_HIGH_TIMEOUT_MS,
),
route_backpressure_high_watermark_pct: AtomicU8::new(
ROUTE_BACKPRESSURE_HIGH_WATERMARK_PCT,
),
}
}
pub fn update_route_backpressure_policy(
&self,
base_timeout_ms: u64,
high_timeout_ms: u64,
high_watermark_pct: u8,
) {
let base = base_timeout_ms.max(1);
let high = high_timeout_ms.max(base);
let watermark = high_watermark_pct.clamp(1, 100);
self.route_backpressure_base_timeout_ms
.store(base, Ordering::Relaxed);
self.route_backpressure_high_timeout_ms
.store(high, Ordering::Relaxed);
self.route_backpressure_high_watermark_pct
.store(watermark, Ordering::Relaxed);
}
pub async fn register(&self) -> (u64, mpsc::Receiver<MeResponse>) {
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
let (tx, rx) = mpsc::channel(ROUTE_CHANNEL_CAPACITY);
@ -112,10 +144,40 @@ impl ConnRegistry {
Err(TrySendError::Closed(_)) => RouteResult::ChannelClosed,
Err(TrySendError::Full(resp)) => {
// Absorb short bursts without dropping/closing the session immediately.
match tokio::time::timeout(ROUTE_BACKPRESSURE_TIMEOUT, tx.send(resp)).await {
let base_timeout_ms =
self.route_backpressure_base_timeout_ms.load(Ordering::Relaxed).max(1);
let high_timeout_ms = self
.route_backpressure_high_timeout_ms
.load(Ordering::Relaxed)
.max(base_timeout_ms);
let high_watermark_pct = self
.route_backpressure_high_watermark_pct
.load(Ordering::Relaxed)
.clamp(1, 100);
let used = ROUTE_CHANNEL_CAPACITY.saturating_sub(tx.capacity());
let used_pct = if ROUTE_CHANNEL_CAPACITY == 0 {
100
} else {
(used.saturating_mul(100) / ROUTE_CHANNEL_CAPACITY) as u8
};
let high_profile = used_pct >= high_watermark_pct;
let timeout_ms = if high_profile {
high_timeout_ms
} else {
base_timeout_ms
};
let timeout_dur = Duration::from_millis(timeout_ms);
match tokio::time::timeout(timeout_dur, tx.send(resp)).await {
Ok(Ok(())) => RouteResult::Routed,
Ok(Err(_)) => RouteResult::ChannelClosed,
Err(_) => RouteResult::QueueFull,
Err(_) => {
if high_profile {
RouteResult::QueueFullHigh
} else {
RouteResult::QueueFullBase
}
}
}
}
}