mirror of
https://github.com/telemt/telemt.git
synced 2026-04-14 17:14:09 +03:00
Compare commits
14 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cb0832b803 | ||
|
|
c01ca40b6d | ||
|
|
cfec6dbb3c | ||
|
|
1fe1acadd4 | ||
|
|
225fc3e4ea | ||
|
|
4a0d88ad43 | ||
|
|
58ff0c7971 | ||
|
|
7d39bf1698 | ||
|
|
3b8eea762b | ||
|
|
07ec84d071 | ||
|
|
235642459a | ||
|
|
3799fc13c4 | ||
|
|
71261522bd | ||
|
|
762deac511 |
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "telemt"
|
||||
version = "3.1.4"
|
||||
version = "3.1.5"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
|
||||
@@ -9,6 +9,9 @@ const DEFAULT_MIDDLE_PROXY_WARM_STANDBY: usize = 16;
|
||||
const DEFAULT_ME_RECONNECT_MAX_CONCURRENT_PER_DC: u32 = 8;
|
||||
const DEFAULT_ME_RECONNECT_FAST_RETRY_COUNT: u32 = 16;
|
||||
const DEFAULT_ME_SINGLE_ENDPOINT_SHADOW_WRITERS: u8 = 2;
|
||||
const DEFAULT_ME_ADAPTIVE_FLOOR_IDLE_SECS: u64 = 90;
|
||||
const DEFAULT_ME_ADAPTIVE_FLOOR_MIN_WRITERS_SINGLE_ENDPOINT: u8 = 1;
|
||||
const DEFAULT_ME_ADAPTIVE_FLOOR_RECOVER_GRACE_SECS: u64 = 180;
|
||||
const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 3;
|
||||
const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 4;
|
||||
const DEFAULT_LISTEN_ADDR_IPV6: &str = "::";
|
||||
@@ -130,11 +133,11 @@ pub(crate) fn default_middle_proxy_warm_standby() -> usize {
|
||||
}
|
||||
|
||||
pub(crate) fn default_keepalive_interval() -> u64 {
|
||||
25
|
||||
8
|
||||
}
|
||||
|
||||
pub(crate) fn default_keepalive_jitter() -> u64 {
|
||||
5
|
||||
2
|
||||
}
|
||||
|
||||
pub(crate) fn default_warmup_step_delay_ms() -> u64 {
|
||||
@@ -185,6 +188,18 @@ pub(crate) fn default_me_single_endpoint_shadow_rotate_every_secs() -> u64 {
|
||||
900
|
||||
}
|
||||
|
||||
pub(crate) fn default_me_adaptive_floor_idle_secs() -> u64 {
|
||||
DEFAULT_ME_ADAPTIVE_FLOOR_IDLE_SECS
|
||||
}
|
||||
|
||||
pub(crate) fn default_me_adaptive_floor_min_writers_single_endpoint() -> u8 {
|
||||
DEFAULT_ME_ADAPTIVE_FLOOR_MIN_WRITERS_SINGLE_ENDPOINT
|
||||
}
|
||||
|
||||
pub(crate) fn default_me_adaptive_floor_recover_grace_secs() -> u64 {
|
||||
DEFAULT_ME_ADAPTIVE_FLOOR_RECOVER_GRACE_SECS
|
||||
}
|
||||
|
||||
pub(crate) fn default_upstream_connect_retry_attempts() -> u32 {
|
||||
DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS
|
||||
}
|
||||
|
||||
@@ -32,7 +32,7 @@ use notify::{EventKind, RecursiveMode, Watcher, recommended_watcher};
|
||||
use tokio::sync::{mpsc, watch};
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
use crate::config::{LogLevel, MeSocksKdfPolicy, MeTelemetryLevel};
|
||||
use crate::config::{LogLevel, MeFloorMode, MeSocksKdfPolicy, MeTelemetryLevel};
|
||||
use super::load::ProxyConfig;
|
||||
|
||||
// ── Hot fields ────────────────────────────────────────────────────────────────
|
||||
@@ -58,6 +58,10 @@ pub struct HotFields {
|
||||
pub telemetry_user_enabled: bool,
|
||||
pub telemetry_me_level: MeTelemetryLevel,
|
||||
pub me_socks_kdf_policy: MeSocksKdfPolicy,
|
||||
pub me_floor_mode: MeFloorMode,
|
||||
pub me_adaptive_floor_idle_secs: u64,
|
||||
pub me_adaptive_floor_min_writers_single_endpoint: u8,
|
||||
pub me_adaptive_floor_recover_grace_secs: u64,
|
||||
pub me_route_backpressure_base_timeout_ms: u64,
|
||||
pub me_route_backpressure_high_timeout_ms: u64,
|
||||
pub me_route_backpressure_high_watermark_pct: u8,
|
||||
@@ -85,6 +89,14 @@ impl HotFields {
|
||||
telemetry_user_enabled: cfg.general.telemetry.user_enabled,
|
||||
telemetry_me_level: cfg.general.telemetry.me_level,
|
||||
me_socks_kdf_policy: cfg.general.me_socks_kdf_policy,
|
||||
me_floor_mode: cfg.general.me_floor_mode,
|
||||
me_adaptive_floor_idle_secs: cfg.general.me_adaptive_floor_idle_secs,
|
||||
me_adaptive_floor_min_writers_single_endpoint: cfg
|
||||
.general
|
||||
.me_adaptive_floor_min_writers_single_endpoint,
|
||||
me_adaptive_floor_recover_grace_secs: cfg
|
||||
.general
|
||||
.me_adaptive_floor_recover_grace_secs,
|
||||
me_route_backpressure_base_timeout_ms: cfg.general.me_route_backpressure_base_timeout_ms,
|
||||
me_route_backpressure_high_timeout_ms: cfg.general.me_route_backpressure_high_timeout_ms,
|
||||
me_route_backpressure_high_watermark_pct: cfg.general.me_route_backpressure_high_watermark_pct,
|
||||
@@ -309,6 +321,22 @@ fn log_changes(
|
||||
);
|
||||
}
|
||||
|
||||
if old_hot.me_floor_mode != new_hot.me_floor_mode
|
||||
|| old_hot.me_adaptive_floor_idle_secs != new_hot.me_adaptive_floor_idle_secs
|
||||
|| old_hot.me_adaptive_floor_min_writers_single_endpoint
|
||||
!= new_hot.me_adaptive_floor_min_writers_single_endpoint
|
||||
|| old_hot.me_adaptive_floor_recover_grace_secs
|
||||
!= new_hot.me_adaptive_floor_recover_grace_secs
|
||||
{
|
||||
info!(
|
||||
"config reload: me_floor: mode={:?} idle={}s min_single={} recover_grace={}s",
|
||||
new_hot.me_floor_mode,
|
||||
new_hot.me_adaptive_floor_idle_secs,
|
||||
new_hot.me_adaptive_floor_min_writers_single_endpoint,
|
||||
new_hot.me_adaptive_floor_recover_grace_secs,
|
||||
);
|
||||
}
|
||||
|
||||
if old_hot.me_route_backpressure_base_timeout_ms
|
||||
!= new_hot.me_route_backpressure_base_timeout_ms
|
||||
|| old_hot.me_route_backpressure_high_timeout_ms
|
||||
|
||||
@@ -261,6 +261,15 @@ impl ProxyConfig {
|
||||
));
|
||||
}
|
||||
|
||||
if config.general.me_adaptive_floor_min_writers_single_endpoint == 0
|
||||
|| config.general.me_adaptive_floor_min_writers_single_endpoint > 32
|
||||
{
|
||||
return Err(ProxyError::Config(
|
||||
"general.me_adaptive_floor_min_writers_single_endpoint must be within [1, 32]"
|
||||
.to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
if config.general.me_single_endpoint_outage_backoff_min_ms == 0 {
|
||||
return Err(ProxyError::Config(
|
||||
"general.me_single_endpoint_outage_backoff_min_ms must be > 0".to_string(),
|
||||
@@ -642,6 +651,19 @@ mod tests {
|
||||
cfg.general.me_single_endpoint_shadow_rotate_every_secs,
|
||||
default_me_single_endpoint_shadow_rotate_every_secs()
|
||||
);
|
||||
assert_eq!(cfg.general.me_floor_mode, MeFloorMode::default());
|
||||
assert_eq!(
|
||||
cfg.general.me_adaptive_floor_idle_secs,
|
||||
default_me_adaptive_floor_idle_secs()
|
||||
);
|
||||
assert_eq!(
|
||||
cfg.general.me_adaptive_floor_min_writers_single_endpoint,
|
||||
default_me_adaptive_floor_min_writers_single_endpoint()
|
||||
);
|
||||
assert_eq!(
|
||||
cfg.general.me_adaptive_floor_recover_grace_secs,
|
||||
default_me_adaptive_floor_recover_grace_secs()
|
||||
);
|
||||
assert_eq!(
|
||||
cfg.general.upstream_connect_retry_attempts,
|
||||
default_upstream_connect_retry_attempts()
|
||||
@@ -704,6 +726,19 @@ mod tests {
|
||||
general.me_single_endpoint_shadow_rotate_every_secs,
|
||||
default_me_single_endpoint_shadow_rotate_every_secs()
|
||||
);
|
||||
assert_eq!(general.me_floor_mode, MeFloorMode::default());
|
||||
assert_eq!(
|
||||
general.me_adaptive_floor_idle_secs,
|
||||
default_me_adaptive_floor_idle_secs()
|
||||
);
|
||||
assert_eq!(
|
||||
general.me_adaptive_floor_min_writers_single_endpoint,
|
||||
default_me_adaptive_floor_min_writers_single_endpoint()
|
||||
);
|
||||
assert_eq!(
|
||||
general.me_adaptive_floor_recover_grace_secs,
|
||||
default_me_adaptive_floor_recover_grace_secs()
|
||||
);
|
||||
assert_eq!(
|
||||
general.upstream_connect_retry_attempts,
|
||||
default_upstream_connect_retry_attempts()
|
||||
@@ -931,6 +966,50 @@ mod tests {
|
||||
let _ = std::fs::remove_file(path);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn me_adaptive_floor_min_writers_out_of_range_is_rejected() {
|
||||
let toml = r#"
|
||||
[general]
|
||||
me_adaptive_floor_min_writers_single_endpoint = 0
|
||||
|
||||
[censorship]
|
||||
tls_domain = "example.com"
|
||||
|
||||
[access.users]
|
||||
user = "00000000000000000000000000000000"
|
||||
"#;
|
||||
let dir = std::env::temp_dir();
|
||||
let path = dir.join("telemt_me_adaptive_floor_min_writers_out_of_range_test.toml");
|
||||
std::fs::write(&path, toml).unwrap();
|
||||
let err = ProxyConfig::load(&path).unwrap_err().to_string();
|
||||
assert!(
|
||||
err.contains(
|
||||
"general.me_adaptive_floor_min_writers_single_endpoint must be within [1, 32]"
|
||||
)
|
||||
);
|
||||
let _ = std::fs::remove_file(path);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn me_floor_mode_adaptive_is_parsed() {
|
||||
let toml = r#"
|
||||
[general]
|
||||
me_floor_mode = "adaptive"
|
||||
|
||||
[censorship]
|
||||
tls_domain = "example.com"
|
||||
|
||||
[access.users]
|
||||
user = "00000000000000000000000000000000"
|
||||
"#;
|
||||
let dir = std::env::temp_dir();
|
||||
let path = dir.join("telemt_me_floor_mode_adaptive_test.toml");
|
||||
std::fs::write(&path, toml).unwrap();
|
||||
let cfg = ProxyConfig::load(&path).unwrap();
|
||||
assert_eq!(cfg.general.me_floor_mode, MeFloorMode::Adaptive);
|
||||
let _ = std::fs::remove_file(path);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn upstream_connect_retry_attempts_zero_is_rejected() {
|
||||
let toml = r#"
|
||||
|
||||
@@ -158,6 +158,31 @@ impl MeBindStaleMode {
|
||||
}
|
||||
}
|
||||
|
||||
/// Middle-End writer floor policy mode.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum MeFloorMode {
|
||||
#[default]
|
||||
Static,
|
||||
Adaptive,
|
||||
}
|
||||
|
||||
impl MeFloorMode {
|
||||
pub fn as_u8(self) -> u8 {
|
||||
match self {
|
||||
MeFloorMode::Static => 0,
|
||||
MeFloorMode::Adaptive => 1,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_u8(raw: u8) -> Self {
|
||||
match raw {
|
||||
1 => MeFloorMode::Adaptive,
|
||||
_ => MeFloorMode::Static,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Telemetry controls for hot-path counters and ME diagnostics.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct TelemetryConfig {
|
||||
@@ -419,6 +444,22 @@ pub struct GeneralConfig {
|
||||
#[serde(default = "default_me_single_endpoint_shadow_rotate_every_secs")]
|
||||
pub me_single_endpoint_shadow_rotate_every_secs: u64,
|
||||
|
||||
/// Floor policy mode for ME writer targets.
|
||||
#[serde(default)]
|
||||
pub me_floor_mode: MeFloorMode,
|
||||
|
||||
/// Idle time in seconds before adaptive floor can reduce single-endpoint writer target.
|
||||
#[serde(default = "default_me_adaptive_floor_idle_secs")]
|
||||
pub me_adaptive_floor_idle_secs: u64,
|
||||
|
||||
/// Minimum writer target for single-endpoint DC groups in adaptive floor mode.
|
||||
#[serde(default = "default_me_adaptive_floor_min_writers_single_endpoint")]
|
||||
pub me_adaptive_floor_min_writers_single_endpoint: u8,
|
||||
|
||||
/// Grace period in seconds to hold static floor after activity in adaptive mode.
|
||||
#[serde(default = "default_me_adaptive_floor_recover_grace_secs")]
|
||||
pub me_adaptive_floor_recover_grace_secs: u64,
|
||||
|
||||
/// Connect attempts for the selected upstream before returning error/fallback.
|
||||
#[serde(default = "default_upstream_connect_retry_attempts")]
|
||||
pub upstream_connect_retry_attempts: u32,
|
||||
@@ -634,6 +675,10 @@ impl Default for GeneralConfig {
|
||||
me_single_endpoint_outage_backoff_min_ms: default_me_single_endpoint_outage_backoff_min_ms(),
|
||||
me_single_endpoint_outage_backoff_max_ms: default_me_single_endpoint_outage_backoff_max_ms(),
|
||||
me_single_endpoint_shadow_rotate_every_secs: default_me_single_endpoint_shadow_rotate_every_secs(),
|
||||
me_floor_mode: MeFloorMode::default(),
|
||||
me_adaptive_floor_idle_secs: default_me_adaptive_floor_idle_secs(),
|
||||
me_adaptive_floor_min_writers_single_endpoint: default_me_adaptive_floor_min_writers_single_endpoint(),
|
||||
me_adaptive_floor_recover_grace_secs: default_me_adaptive_floor_recover_grace_secs(),
|
||||
upstream_connect_retry_attempts: default_upstream_connect_retry_attempts(),
|
||||
upstream_connect_retry_backoff_ms: default_upstream_connect_retry_backoff_ms(),
|
||||
upstream_unhealthy_fail_threshold: default_upstream_unhealthy_fail_threshold(),
|
||||
|
||||
@@ -544,6 +544,10 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||
config.general.me_single_endpoint_outage_backoff_min_ms,
|
||||
config.general.me_single_endpoint_outage_backoff_max_ms,
|
||||
config.general.me_single_endpoint_shadow_rotate_every_secs,
|
||||
config.general.me_floor_mode,
|
||||
config.general.me_adaptive_floor_idle_secs,
|
||||
config.general.me_adaptive_floor_min_writers_single_endpoint,
|
||||
config.general.me_adaptive_floor_recover_grace_secs,
|
||||
config.general.hardswap,
|
||||
config.general.me_pool_drain_ttl_secs,
|
||||
config.general.effective_me_pool_force_close_secs(),
|
||||
|
||||
@@ -449,6 +449,21 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
|
||||
}
|
||||
);
|
||||
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_me_kdf_port_only_drift_total ME KDF client-port changes with stable non-port material"
|
||||
);
|
||||
let _ = writeln!(out, "# TYPE telemt_me_kdf_port_only_drift_total counter");
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_me_kdf_port_only_drift_total {}",
|
||||
if me_allows_debug {
|
||||
stats.get_me_kdf_port_only_drift_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_me_hardswap_pending_reuse_total Hardswap cycles that reused an existing pending generation"
|
||||
@@ -587,6 +602,82 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
|
||||
}
|
||||
);
|
||||
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_me_single_endpoint_shadow_rotate_skipped_quarantine_total Shadow rotations skipped because endpoint is quarantined"
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# TYPE telemt_me_single_endpoint_shadow_rotate_skipped_quarantine_total counter"
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_me_single_endpoint_shadow_rotate_skipped_quarantine_total {}",
|
||||
if me_allows_normal {
|
||||
stats.get_me_single_endpoint_shadow_rotate_skipped_quarantine_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_me_floor_mode Runtime ME writer floor policy mode"
|
||||
);
|
||||
let _ = writeln!(out, "# TYPE telemt_me_floor_mode gauge");
|
||||
let floor_mode = config.general.me_floor_mode;
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_me_floor_mode{{mode=\"static\"}} {}",
|
||||
if matches!(floor_mode, crate::config::MeFloorMode::Static) {
|
||||
1
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_me_floor_mode{{mode=\"adaptive\"}} {}",
|
||||
if matches!(floor_mode, crate::config::MeFloorMode::Adaptive) {
|
||||
1
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_me_floor_mode_switch_all_total Runtime ME floor mode switches"
|
||||
);
|
||||
let _ = writeln!(out, "# TYPE telemt_me_floor_mode_switch_all_total counter");
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_me_floor_mode_switch_all_total {}",
|
||||
if me_allows_normal {
|
||||
stats.get_me_floor_mode_switch_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_me_floor_mode_switch_total{{from=\"static\",to=\"adaptive\"}} {}",
|
||||
if me_allows_normal {
|
||||
stats.get_me_floor_mode_switch_static_to_adaptive_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_me_floor_mode_switch_total{{from=\"adaptive\",to=\"static\"}} {}",
|
||||
if me_allows_normal {
|
||||
stats.get_me_floor_mode_switch_adaptive_to_static_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
|
||||
let _ = writeln!(out, "# HELP telemt_secure_padding_invalid_total Invalid secure frame lengths");
|
||||
let _ = writeln!(out, "# TYPE telemt_secure_padding_invalid_total counter");
|
||||
let _ = writeln!(
|
||||
@@ -679,7 +770,7 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_pool_swap_total {}",
|
||||
if me_allows_debug {
|
||||
if me_allows_normal {
|
||||
stats.get_pool_swap_total()
|
||||
} else {
|
||||
0
|
||||
|
||||
@@ -38,6 +38,7 @@ pub struct Stats {
|
||||
me_seq_mismatch: AtomicU64,
|
||||
me_endpoint_quarantine_total: AtomicU64,
|
||||
me_kdf_drift_total: AtomicU64,
|
||||
me_kdf_port_only_drift_total: AtomicU64,
|
||||
me_hardswap_pending_reuse_total: AtomicU64,
|
||||
me_hardswap_pending_ttl_expired_total: AtomicU64,
|
||||
me_single_endpoint_outage_enter_total: AtomicU64,
|
||||
@@ -46,6 +47,10 @@ pub struct Stats {
|
||||
me_single_endpoint_outage_reconnect_success_total: AtomicU64,
|
||||
me_single_endpoint_quarantine_bypass_total: AtomicU64,
|
||||
me_single_endpoint_shadow_rotate_total: AtomicU64,
|
||||
me_single_endpoint_shadow_rotate_skipped_quarantine_total: AtomicU64,
|
||||
me_floor_mode_switch_total: AtomicU64,
|
||||
me_floor_mode_switch_static_to_adaptive_total: AtomicU64,
|
||||
me_floor_mode_switch_adaptive_to_static_total: AtomicU64,
|
||||
me_handshake_error_codes: DashMap<i32, AtomicU64>,
|
||||
me_route_drop_no_conn: AtomicU64,
|
||||
me_route_drop_channel_closed: AtomicU64,
|
||||
@@ -290,7 +295,7 @@ impl Stats {
|
||||
}
|
||||
}
|
||||
pub fn increment_pool_swap_total(&self) {
|
||||
if self.telemetry_me_allows_debug() {
|
||||
if self.telemetry_me_allows_normal() {
|
||||
self.pool_swap_total.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
@@ -377,6 +382,12 @@ impl Stats {
|
||||
self.me_kdf_drift_total.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
pub fn increment_me_kdf_port_only_drift_total(&self) {
|
||||
if self.telemetry_me_allows_debug() {
|
||||
self.me_kdf_port_only_drift_total
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
pub fn increment_me_hardswap_pending_reuse_total(&self) {
|
||||
if self.telemetry_me_allows_debug() {
|
||||
self.me_hardswap_pending_reuse_total
|
||||
@@ -425,6 +436,30 @@ impl Stats {
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
pub fn increment_me_single_endpoint_shadow_rotate_skipped_quarantine_total(&self) {
|
||||
if self.telemetry_me_allows_normal() {
|
||||
self.me_single_endpoint_shadow_rotate_skipped_quarantine_total
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
pub fn increment_me_floor_mode_switch_total(&self) {
|
||||
if self.telemetry_me_allows_normal() {
|
||||
self.me_floor_mode_switch_total
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
pub fn increment_me_floor_mode_switch_static_to_adaptive_total(&self) {
|
||||
if self.telemetry_me_allows_normal() {
|
||||
self.me_floor_mode_switch_static_to_adaptive_total
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
pub fn increment_me_floor_mode_switch_adaptive_to_static_total(&self) {
|
||||
if self.telemetry_me_allows_normal() {
|
||||
self.me_floor_mode_switch_adaptive_to_static_total
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) }
|
||||
pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) }
|
||||
pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) }
|
||||
@@ -447,6 +482,9 @@ impl Stats {
|
||||
pub fn get_me_kdf_drift_total(&self) -> u64 {
|
||||
self.me_kdf_drift_total.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_me_kdf_port_only_drift_total(&self) -> u64 {
|
||||
self.me_kdf_port_only_drift_total.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_me_hardswap_pending_reuse_total(&self) -> u64 {
|
||||
self.me_hardswap_pending_reuse_total
|
||||
.load(Ordering::Relaxed)
|
||||
@@ -479,6 +517,21 @@ impl Stats {
|
||||
self.me_single_endpoint_shadow_rotate_total
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_me_single_endpoint_shadow_rotate_skipped_quarantine_total(&self) -> u64 {
|
||||
self.me_single_endpoint_shadow_rotate_skipped_quarantine_total
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_me_floor_mode_switch_total(&self) -> u64 {
|
||||
self.me_floor_mode_switch_total.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_me_floor_mode_switch_static_to_adaptive_total(&self) -> u64 {
|
||||
self.me_floor_mode_switch_static_to_adaptive_total
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_me_floor_mode_switch_adaptive_to_static_total(&self) -> u64 {
|
||||
self.me_floor_mode_switch_adaptive_to_static_total
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_me_handshake_error_code_counts(&self) -> Vec<(i32, u64)> {
|
||||
let mut out: Vec<(i32, u64)> = self
|
||||
.me_handshake_error_codes
|
||||
|
||||
@@ -282,6 +282,10 @@ async fn run_update_cycle(
|
||||
cfg.general.me_single_endpoint_outage_backoff_min_ms,
|
||||
cfg.general.me_single_endpoint_outage_backoff_max_ms,
|
||||
cfg.general.me_single_endpoint_shadow_rotate_every_secs,
|
||||
cfg.general.me_floor_mode,
|
||||
cfg.general.me_adaptive_floor_idle_secs,
|
||||
cfg.general.me_adaptive_floor_min_writers_single_endpoint,
|
||||
cfg.general.me_adaptive_floor_recover_grace_secs,
|
||||
);
|
||||
|
||||
let required_cfg_snapshots = cfg.general.me_config_stable_snapshots.max(1);
|
||||
@@ -490,6 +494,10 @@ pub async fn me_config_updater(
|
||||
cfg.general.me_single_endpoint_outage_backoff_min_ms,
|
||||
cfg.general.me_single_endpoint_outage_backoff_max_ms,
|
||||
cfg.general.me_single_endpoint_shadow_rotate_every_secs,
|
||||
cfg.general.me_floor_mode,
|
||||
cfg.general.me_adaptive_floor_idle_secs,
|
||||
cfg.general.me_adaptive_floor_min_writers_single_endpoint,
|
||||
cfg.general.me_adaptive_floor_recover_grace_secs,
|
||||
);
|
||||
let new_secs = cfg.general.effective_update_every_secs().max(1);
|
||||
if new_secs == update_every_secs {
|
||||
|
||||
@@ -38,6 +38,22 @@ use super::MePool;
|
||||
|
||||
const ME_KDF_DRIFT_STRICT: bool = false;
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
|
||||
enum KdfClientPortSource {
|
||||
LocalSocket = 0,
|
||||
SocksBound = 1,
|
||||
}
|
||||
|
||||
impl KdfClientPortSource {
|
||||
fn from_socks_bound_port(socks_bound_port: Option<u16>) -> Self {
|
||||
if socks_bound_port.is_some() {
|
||||
Self::SocksBound
|
||||
} else {
|
||||
Self::LocalSocket
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Result of a successful ME handshake with timings.
|
||||
pub(crate) struct HandshakeOutput {
|
||||
pub rd: ReadHalf<TcpStream>,
|
||||
@@ -52,18 +68,18 @@ pub(crate) struct HandshakeOutput {
|
||||
|
||||
impl MePool {
|
||||
fn kdf_material_fingerprint(
|
||||
local_addr_nat: SocketAddr,
|
||||
local_ip_nat: IpAddr,
|
||||
peer_addr_nat: SocketAddr,
|
||||
client_port_for_kdf: u16,
|
||||
reflected: Option<SocketAddr>,
|
||||
socks_bound_addr: Option<SocketAddr>,
|
||||
reflected_ip: Option<IpAddr>,
|
||||
socks_bound_ip: Option<IpAddr>,
|
||||
client_port_source: KdfClientPortSource,
|
||||
) -> u64 {
|
||||
let mut hasher = DefaultHasher::new();
|
||||
local_addr_nat.hash(&mut hasher);
|
||||
local_ip_nat.hash(&mut hasher);
|
||||
peer_addr_nat.hash(&mut hasher);
|
||||
client_port_for_kdf.hash(&mut hasher);
|
||||
reflected.hash(&mut hasher);
|
||||
socks_bound_addr.hash(&mut hasher);
|
||||
reflected_ip.hash(&mut hasher);
|
||||
socks_bound_ip.hash(&mut hasher);
|
||||
client_port_source.hash(&mut hasher);
|
||||
hasher.finish()
|
||||
}
|
||||
|
||||
@@ -359,35 +375,48 @@ impl MePool {
|
||||
|
||||
let ts_bytes = crypto_ts.to_le_bytes();
|
||||
let server_port_bytes = peer_addr_nat.port().to_le_bytes();
|
||||
let client_port_for_kdf = socks_bound_addr
|
||||
let socks_bound_port = socks_bound_addr
|
||||
.map(|bound| bound.port())
|
||||
.filter(|port| *port != 0)
|
||||
.unwrap_or(local_addr_nat.port());
|
||||
.filter(|port| *port != 0);
|
||||
let client_port_for_kdf = socks_bound_port.unwrap_or(local_addr_nat.port());
|
||||
let client_port_source = KdfClientPortSource::from_socks_bound_port(socks_bound_port);
|
||||
let kdf_fingerprint = Self::kdf_material_fingerprint(
|
||||
local_addr_nat,
|
||||
local_addr_nat.ip(),
|
||||
peer_addr_nat,
|
||||
client_port_for_kdf,
|
||||
reflected,
|
||||
socks_bound_addr,
|
||||
reflected.map(|value| value.ip()),
|
||||
socks_bound_addr.map(|value| value.ip()),
|
||||
client_port_source,
|
||||
);
|
||||
let mut kdf_fingerprint_guard = self.kdf_material_fingerprint.lock().await;
|
||||
if let Some(prev_fingerprint) = kdf_fingerprint_guard.get(&peer_addr_nat).copied()
|
||||
&& prev_fingerprint != kdf_fingerprint
|
||||
if let Some((prev_fingerprint, prev_client_port)) =
|
||||
kdf_fingerprint_guard.get(&peer_addr_nat).copied()
|
||||
{
|
||||
self.stats.increment_me_kdf_drift_total();
|
||||
warn!(
|
||||
%peer_addr_nat,
|
||||
%local_addr_nat,
|
||||
client_port_for_kdf,
|
||||
"ME KDF input drift detected for endpoint"
|
||||
);
|
||||
if ME_KDF_DRIFT_STRICT {
|
||||
return Err(ProxyError::InvalidHandshake(
|
||||
"ME KDF input drift detected (strict mode)".to_string(),
|
||||
));
|
||||
if prev_fingerprint != kdf_fingerprint {
|
||||
self.stats.increment_me_kdf_drift_total();
|
||||
warn!(
|
||||
%peer_addr_nat,
|
||||
%local_addr_nat,
|
||||
client_port_for_kdf,
|
||||
client_port_source = ?client_port_source,
|
||||
"ME KDF material drift detected for endpoint"
|
||||
);
|
||||
if ME_KDF_DRIFT_STRICT {
|
||||
return Err(ProxyError::InvalidHandshake(
|
||||
"ME KDF material drift detected (strict mode)".to_string(),
|
||||
));
|
||||
}
|
||||
} else if prev_client_port != client_port_for_kdf {
|
||||
self.stats.increment_me_kdf_port_only_drift_total();
|
||||
debug!(
|
||||
%peer_addr_nat,
|
||||
previous_client_port_for_kdf = prev_client_port,
|
||||
client_port_for_kdf,
|
||||
client_port_source = ?client_port_source,
|
||||
"ME KDF client port changed with stable material"
|
||||
);
|
||||
}
|
||||
}
|
||||
kdf_fingerprint_guard.insert(peer_addr_nat, kdf_fingerprint);
|
||||
kdf_fingerprint_guard.insert(peer_addr_nat, (kdf_fingerprint, client_port_for_kdf));
|
||||
drop(kdf_fingerprint_guard);
|
||||
|
||||
let client_port_bytes = client_port_for_kdf.to_le_bytes();
|
||||
|
||||
@@ -7,6 +7,7 @@ use std::time::{Duration, Instant};
|
||||
use rand::Rng;
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
use crate::config::MeFloorMode;
|
||||
use crate::crypto::SecureRandom;
|
||||
use crate::network::IpFamily;
|
||||
|
||||
@@ -26,6 +27,8 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
|
||||
let mut outage_next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new();
|
||||
let mut single_endpoint_outage: HashSet<(i32, IpFamily)> = HashSet::new();
|
||||
let mut shadow_rotate_deadline: HashMap<(i32, IpFamily), Instant> = HashMap::new();
|
||||
let mut adaptive_idle_since: HashMap<(i32, IpFamily), Instant> = HashMap::new();
|
||||
let mut adaptive_recover_until: HashMap<(i32, IpFamily), Instant> = HashMap::new();
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_secs(HEALTH_INTERVAL_SECS)).await;
|
||||
pool.prune_closed_writers().await;
|
||||
@@ -40,6 +43,8 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
|
||||
&mut outage_next_attempt,
|
||||
&mut single_endpoint_outage,
|
||||
&mut shadow_rotate_deadline,
|
||||
&mut adaptive_idle_since,
|
||||
&mut adaptive_recover_until,
|
||||
)
|
||||
.await;
|
||||
check_family(
|
||||
@@ -53,6 +58,8 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
|
||||
&mut outage_next_attempt,
|
||||
&mut single_endpoint_outage,
|
||||
&mut shadow_rotate_deadline,
|
||||
&mut adaptive_idle_since,
|
||||
&mut adaptive_recover_until,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -69,6 +76,8 @@ async fn check_family(
|
||||
outage_next_attempt: &mut HashMap<(i32, IpFamily), Instant>,
|
||||
single_endpoint_outage: &mut HashSet<(i32, IpFamily)>,
|
||||
shadow_rotate_deadline: &mut HashMap<(i32, IpFamily), Instant>,
|
||||
adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>,
|
||||
adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>,
|
||||
) {
|
||||
let enabled = match family {
|
||||
IpFamily::V4 => pool.decision.ipv4_me,
|
||||
@@ -95,6 +104,11 @@ async fn check_family(
|
||||
endpoints.dedup();
|
||||
}
|
||||
|
||||
if pool.floor_mode() == MeFloorMode::Static {
|
||||
adaptive_idle_since.clear();
|
||||
adaptive_recover_until.clear();
|
||||
}
|
||||
|
||||
let mut live_addr_counts = HashMap::<SocketAddr, usize>::new();
|
||||
let mut live_writer_ids_by_addr = HashMap::<SocketAddr, Vec<u64>>::new();
|
||||
for writer in pool.writers.read().await.iter().filter(|w| {
|
||||
@@ -111,12 +125,21 @@ async fn check_family(
|
||||
if endpoints.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let required = pool.required_writers_for_dc(endpoints.len());
|
||||
let key = (dc, family);
|
||||
let reduce_for_idle = should_reduce_floor_for_idle(
|
||||
pool,
|
||||
key,
|
||||
&endpoints,
|
||||
&live_writer_ids_by_addr,
|
||||
adaptive_idle_since,
|
||||
adaptive_recover_until,
|
||||
)
|
||||
.await;
|
||||
let required = pool.required_writers_for_dc_with_floor_mode(endpoints.len(), reduce_for_idle);
|
||||
let alive = endpoints
|
||||
.iter()
|
||||
.map(|addr| *live_addr_counts.get(addr).unwrap_or(&0))
|
||||
.sum::<usize>();
|
||||
let key = (dc, family);
|
||||
|
||||
if endpoints.len() == 1 && pool.single_endpoint_outage_mode_enabled() && alive == 0 {
|
||||
if single_endpoint_outage.insert(key) {
|
||||
@@ -148,6 +171,8 @@ async fn check_family(
|
||||
outage_backoff.remove(&key);
|
||||
outage_next_attempt.remove(&key);
|
||||
shadow_rotate_deadline.remove(&key);
|
||||
adaptive_idle_since.remove(&key);
|
||||
adaptive_recover_until.remove(&key);
|
||||
info!(
|
||||
dc = %dc,
|
||||
?family,
|
||||
@@ -262,6 +287,54 @@ async fn check_family(
|
||||
}
|
||||
}
|
||||
|
||||
async fn should_reduce_floor_for_idle(
|
||||
pool: &Arc<MePool>,
|
||||
key: (i32, IpFamily),
|
||||
endpoints: &[SocketAddr],
|
||||
live_writer_ids_by_addr: &HashMap<SocketAddr, Vec<u64>>,
|
||||
adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>,
|
||||
adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>,
|
||||
) -> bool {
|
||||
if endpoints.len() != 1 || pool.floor_mode() != MeFloorMode::Adaptive {
|
||||
adaptive_idle_since.remove(&key);
|
||||
adaptive_recover_until.remove(&key);
|
||||
return false;
|
||||
}
|
||||
|
||||
let now = Instant::now();
|
||||
let endpoint = endpoints[0];
|
||||
let writer_ids = live_writer_ids_by_addr
|
||||
.get(&endpoint)
|
||||
.map(Vec::as_slice)
|
||||
.unwrap_or(&[]);
|
||||
let has_bound_clients = has_bound_clients_on_endpoint(pool, writer_ids).await;
|
||||
if has_bound_clients {
|
||||
adaptive_idle_since.remove(&key);
|
||||
adaptive_recover_until.insert(key, now + pool.adaptive_floor_recover_grace_duration());
|
||||
return false;
|
||||
}
|
||||
|
||||
if let Some(recover_until) = adaptive_recover_until.get(&key)
|
||||
&& now < *recover_until
|
||||
{
|
||||
adaptive_idle_since.remove(&key);
|
||||
return false;
|
||||
}
|
||||
adaptive_recover_until.remove(&key);
|
||||
|
||||
let idle_since = adaptive_idle_since.entry(key).or_insert(now);
|
||||
now.saturating_duration_since(*idle_since) >= pool.adaptive_floor_idle_duration()
|
||||
}
|
||||
|
||||
async fn has_bound_clients_on_endpoint(pool: &Arc<MePool>, writer_ids: &[u64]) -> bool {
|
||||
for writer_id in writer_ids {
|
||||
if !pool.registry.is_writer_empty(*writer_id).await {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
async fn recover_single_endpoint_outage(
|
||||
pool: &Arc<MePool>,
|
||||
rng: &Arc<SecureRandom>,
|
||||
@@ -395,6 +468,19 @@ async fn maybe_rotate_single_endpoint_shadow(
|
||||
}
|
||||
|
||||
let endpoint = endpoints[0];
|
||||
if pool.is_endpoint_quarantined(endpoint).await {
|
||||
pool.stats
|
||||
.increment_me_single_endpoint_shadow_rotate_skipped_quarantine_total();
|
||||
shadow_rotate_deadline.insert(key, now + Duration::from_secs(SHADOW_ROTATE_RETRY_SECS));
|
||||
debug!(
|
||||
dc = %dc,
|
||||
?family,
|
||||
%endpoint,
|
||||
"Single-endpoint shadow rotation skipped: endpoint is quarantined"
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
let Some(writer_ids) = live_writer_ids_by_addr.get(&endpoint) else {
|
||||
shadow_rotate_deadline.insert(key, now + Duration::from_secs(SHADOW_ROTATE_RETRY_SECS));
|
||||
return;
|
||||
|
||||
@@ -7,7 +7,7 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
||||
use tokio::sync::{Mutex, Notify, RwLock, mpsc};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::config::{MeBindStaleMode, MeSocksKdfPolicy};
|
||||
use crate::config::{MeBindStaleMode, MeFloorMode, MeSocksKdfPolicy};
|
||||
use crate::crypto::SecureRandom;
|
||||
use crate::network::IpFamily;
|
||||
use crate::network::probe::NetworkDecision;
|
||||
@@ -107,6 +107,10 @@ pub struct MePool {
|
||||
pub(super) me_single_endpoint_outage_backoff_min_ms: AtomicU64,
|
||||
pub(super) me_single_endpoint_outage_backoff_max_ms: AtomicU64,
|
||||
pub(super) me_single_endpoint_shadow_rotate_every_secs: AtomicU64,
|
||||
pub(super) me_floor_mode: AtomicU8,
|
||||
pub(super) me_adaptive_floor_idle_secs: AtomicU64,
|
||||
pub(super) me_adaptive_floor_min_writers_single_endpoint: AtomicU8,
|
||||
pub(super) me_adaptive_floor_recover_grace_secs: AtomicU64,
|
||||
pub(super) proxy_map_v4: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
|
||||
pub(super) proxy_map_v6: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
|
||||
pub(super) default_dc: AtomicI32,
|
||||
@@ -127,7 +131,7 @@ pub struct MePool {
|
||||
pub(super) pending_hardswap_map_hash: AtomicU64,
|
||||
pub(super) hardswap: AtomicBool,
|
||||
pub(super) endpoint_quarantine: Arc<Mutex<HashMap<SocketAddr, Instant>>>,
|
||||
pub(super) kdf_material_fingerprint: Arc<Mutex<HashMap<SocketAddr, u64>>>,
|
||||
pub(super) kdf_material_fingerprint: Arc<Mutex<HashMap<SocketAddr, (u64, u16)>>>,
|
||||
pub(super) me_pool_drain_ttl_secs: AtomicU64,
|
||||
pub(super) me_pool_force_close_secs: AtomicU64,
|
||||
pub(super) me_pool_min_fresh_ratio_permille: AtomicU32,
|
||||
@@ -201,6 +205,10 @@ impl MePool {
|
||||
me_single_endpoint_outage_backoff_min_ms: u64,
|
||||
me_single_endpoint_outage_backoff_max_ms: u64,
|
||||
me_single_endpoint_shadow_rotate_every_secs: u64,
|
||||
me_floor_mode: MeFloorMode,
|
||||
me_adaptive_floor_idle_secs: u64,
|
||||
me_adaptive_floor_min_writers_single_endpoint: u8,
|
||||
me_adaptive_floor_recover_grace_secs: u64,
|
||||
hardswap: bool,
|
||||
me_pool_drain_ttl_secs: u64,
|
||||
me_pool_force_close_secs: u64,
|
||||
@@ -287,6 +295,14 @@ impl MePool {
|
||||
me_single_endpoint_shadow_rotate_every_secs: AtomicU64::new(
|
||||
me_single_endpoint_shadow_rotate_every_secs,
|
||||
),
|
||||
me_floor_mode: AtomicU8::new(me_floor_mode.as_u8()),
|
||||
me_adaptive_floor_idle_secs: AtomicU64::new(me_adaptive_floor_idle_secs),
|
||||
me_adaptive_floor_min_writers_single_endpoint: AtomicU8::new(
|
||||
me_adaptive_floor_min_writers_single_endpoint,
|
||||
),
|
||||
me_adaptive_floor_recover_grace_secs: AtomicU64::new(
|
||||
me_adaptive_floor_recover_grace_secs,
|
||||
),
|
||||
pool_size: 2,
|
||||
proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)),
|
||||
proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)),
|
||||
@@ -351,6 +367,10 @@ impl MePool {
|
||||
single_endpoint_outage_backoff_min_ms: u64,
|
||||
single_endpoint_outage_backoff_max_ms: u64,
|
||||
single_endpoint_shadow_rotate_every_secs: u64,
|
||||
floor_mode: MeFloorMode,
|
||||
adaptive_floor_idle_secs: u64,
|
||||
adaptive_floor_min_writers_single_endpoint: u8,
|
||||
adaptive_floor_recover_grace_secs: u64,
|
||||
) {
|
||||
self.hardswap.store(hardswap, Ordering::Relaxed);
|
||||
self.me_pool_drain_ttl_secs
|
||||
@@ -387,6 +407,29 @@ impl MePool {
|
||||
.store(single_endpoint_outage_backoff_max_ms, Ordering::Relaxed);
|
||||
self.me_single_endpoint_shadow_rotate_every_secs
|
||||
.store(single_endpoint_shadow_rotate_every_secs, Ordering::Relaxed);
|
||||
let previous_floor_mode = self.floor_mode();
|
||||
self.me_floor_mode
|
||||
.store(floor_mode.as_u8(), Ordering::Relaxed);
|
||||
self.me_adaptive_floor_idle_secs
|
||||
.store(adaptive_floor_idle_secs, Ordering::Relaxed);
|
||||
self.me_adaptive_floor_min_writers_single_endpoint
|
||||
.store(adaptive_floor_min_writers_single_endpoint, Ordering::Relaxed);
|
||||
self.me_adaptive_floor_recover_grace_secs
|
||||
.store(adaptive_floor_recover_grace_secs, Ordering::Relaxed);
|
||||
if previous_floor_mode != floor_mode {
|
||||
self.stats.increment_me_floor_mode_switch_total();
|
||||
match (previous_floor_mode, floor_mode) {
|
||||
(MeFloorMode::Static, MeFloorMode::Adaptive) => {
|
||||
self.stats
|
||||
.increment_me_floor_mode_switch_static_to_adaptive_total();
|
||||
}
|
||||
(MeFloorMode::Adaptive, MeFloorMode::Static) => {
|
||||
self.stats
|
||||
.increment_me_floor_mode_switch_adaptive_to_static_total();
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn reset_stun_state(&self) {
|
||||
@@ -464,6 +507,40 @@ impl MePool {
|
||||
endpoint_count.max(3)
|
||||
}
|
||||
|
||||
pub(super) fn floor_mode(&self) -> MeFloorMode {
|
||||
MeFloorMode::from_u8(self.me_floor_mode.load(Ordering::Relaxed))
|
||||
}
|
||||
|
||||
pub(super) fn adaptive_floor_idle_duration(&self) -> Duration {
|
||||
Duration::from_secs(self.me_adaptive_floor_idle_secs.load(Ordering::Relaxed))
|
||||
}
|
||||
|
||||
pub(super) fn adaptive_floor_recover_grace_duration(&self) -> Duration {
|
||||
Duration::from_secs(
|
||||
self.me_adaptive_floor_recover_grace_secs
|
||||
.load(Ordering::Relaxed),
|
||||
)
|
||||
}
|
||||
|
||||
pub(super) fn required_writers_for_dc_with_floor_mode(
|
||||
&self,
|
||||
endpoint_count: usize,
|
||||
reduce_for_idle: bool,
|
||||
) -> usize {
|
||||
let base_required = self.required_writers_for_dc(endpoint_count);
|
||||
if !reduce_for_idle {
|
||||
return base_required;
|
||||
}
|
||||
if endpoint_count != 1 || self.floor_mode() != MeFloorMode::Adaptive {
|
||||
return base_required;
|
||||
}
|
||||
let min_writers = (self
|
||||
.me_adaptive_floor_min_writers_single_endpoint
|
||||
.load(Ordering::Relaxed) as usize)
|
||||
.max(1);
|
||||
base_required.min(min_writers)
|
||||
}
|
||||
|
||||
pub(super) fn single_endpoint_outage_mode_enabled(&self) -> bool {
|
||||
self.me_single_endpoint_outage_mode_enabled
|
||||
.load(Ordering::Relaxed)
|
||||
|
||||
@@ -37,7 +37,7 @@ impl MePool {
|
||||
);
|
||||
}
|
||||
|
||||
async fn is_endpoint_quarantined(&self, addr: SocketAddr) -> bool {
|
||||
pub(super) async fn is_endpoint_quarantined(&self, addr: SocketAddr) -> bool {
|
||||
let mut guard = self.endpoint_quarantine.lock().await;
|
||||
let now = Instant::now();
|
||||
guard.retain(|_, expiry| *expiry > now);
|
||||
|
||||
Reference in New Issue
Block a user