ME Pool Shadow Writers

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey 2026-03-02 21:04:06 +03:00
parent 9477103f89
commit a6d22e8a57
No known key found for this signature in database
10 changed files with 748 additions and 16 deletions

View File

@ -8,6 +8,7 @@ const DEFAULT_STUN_TCP_FALLBACK: bool = true;
const DEFAULT_MIDDLE_PROXY_WARM_STANDBY: usize = 16; const DEFAULT_MIDDLE_PROXY_WARM_STANDBY: usize = 16;
const DEFAULT_ME_RECONNECT_MAX_CONCURRENT_PER_DC: u32 = 8; const DEFAULT_ME_RECONNECT_MAX_CONCURRENT_PER_DC: u32 = 8;
const DEFAULT_ME_RECONNECT_FAST_RETRY_COUNT: u32 = 16; const DEFAULT_ME_RECONNECT_FAST_RETRY_COUNT: u32 = 16;
const DEFAULT_ME_SINGLE_ENDPOINT_SHADOW_WRITERS: u8 = 2;
const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 3; const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 3;
const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 4; const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 4;
const DEFAULT_LISTEN_ADDR_IPV6: &str = "::"; const DEFAULT_LISTEN_ADDR_IPV6: &str = "::";
@ -160,6 +161,30 @@ pub(crate) fn default_me_reconnect_fast_retry_count() -> u32 {
DEFAULT_ME_RECONNECT_FAST_RETRY_COUNT DEFAULT_ME_RECONNECT_FAST_RETRY_COUNT
} }
pub(crate) fn default_me_single_endpoint_shadow_writers() -> u8 {
DEFAULT_ME_SINGLE_ENDPOINT_SHADOW_WRITERS
}
pub(crate) fn default_me_single_endpoint_outage_mode_enabled() -> bool {
true
}
pub(crate) fn default_me_single_endpoint_outage_disable_quarantine() -> bool {
true
}
pub(crate) fn default_me_single_endpoint_outage_backoff_min_ms() -> u64 {
250
}
pub(crate) fn default_me_single_endpoint_outage_backoff_max_ms() -> u64 {
3000
}
pub(crate) fn default_me_single_endpoint_shadow_rotate_every_secs() -> u64 {
900
}
pub(crate) fn default_upstream_connect_retry_attempts() -> u32 { pub(crate) fn default_upstream_connect_retry_attempts() -> u32 {
DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS
} }

View File

@ -255,6 +255,32 @@ impl ProxyConfig {
)); ));
} }
if config.general.me_single_endpoint_shadow_writers > 32 {
return Err(ProxyError::Config(
"general.me_single_endpoint_shadow_writers must be within [0, 32]".to_string(),
));
}
if config.general.me_single_endpoint_outage_backoff_min_ms == 0 {
return Err(ProxyError::Config(
"general.me_single_endpoint_outage_backoff_min_ms must be > 0".to_string(),
));
}
if config.general.me_single_endpoint_outage_backoff_max_ms == 0 {
return Err(ProxyError::Config(
"general.me_single_endpoint_outage_backoff_max_ms must be > 0".to_string(),
));
}
if config.general.me_single_endpoint_outage_backoff_min_ms
> config.general.me_single_endpoint_outage_backoff_max_ms
{
return Err(ProxyError::Config(
"general.me_single_endpoint_outage_backoff_min_ms must be <= general.me_single_endpoint_outage_backoff_max_ms".to_string(),
));
}
if config.general.beobachten_minutes == 0 { if config.general.beobachten_minutes == 0 {
return Err(ProxyError::Config( return Err(ProxyError::Config(
"general.beobachten_minutes must be > 0".to_string(), "general.beobachten_minutes must be > 0".to_string(),
@ -592,6 +618,30 @@ mod tests {
cfg.general.me_reconnect_fast_retry_count, cfg.general.me_reconnect_fast_retry_count,
default_me_reconnect_fast_retry_count() default_me_reconnect_fast_retry_count()
); );
assert_eq!(
cfg.general.me_single_endpoint_shadow_writers,
default_me_single_endpoint_shadow_writers()
);
assert_eq!(
cfg.general.me_single_endpoint_outage_mode_enabled,
default_me_single_endpoint_outage_mode_enabled()
);
assert_eq!(
cfg.general.me_single_endpoint_outage_disable_quarantine,
default_me_single_endpoint_outage_disable_quarantine()
);
assert_eq!(
cfg.general.me_single_endpoint_outage_backoff_min_ms,
default_me_single_endpoint_outage_backoff_min_ms()
);
assert_eq!(
cfg.general.me_single_endpoint_outage_backoff_max_ms,
default_me_single_endpoint_outage_backoff_max_ms()
);
assert_eq!(
cfg.general.me_single_endpoint_shadow_rotate_every_secs,
default_me_single_endpoint_shadow_rotate_every_secs()
);
assert_eq!( assert_eq!(
cfg.general.upstream_connect_retry_attempts, cfg.general.upstream_connect_retry_attempts,
default_upstream_connect_retry_attempts() default_upstream_connect_retry_attempts()
@ -630,6 +680,30 @@ mod tests {
general.me_reconnect_fast_retry_count, general.me_reconnect_fast_retry_count,
default_me_reconnect_fast_retry_count() default_me_reconnect_fast_retry_count()
); );
assert_eq!(
general.me_single_endpoint_shadow_writers,
default_me_single_endpoint_shadow_writers()
);
assert_eq!(
general.me_single_endpoint_outage_mode_enabled,
default_me_single_endpoint_outage_mode_enabled()
);
assert_eq!(
general.me_single_endpoint_outage_disable_quarantine,
default_me_single_endpoint_outage_disable_quarantine()
);
assert_eq!(
general.me_single_endpoint_outage_backoff_min_ms,
default_me_single_endpoint_outage_backoff_min_ms()
);
assert_eq!(
general.me_single_endpoint_outage_backoff_max_ms,
default_me_single_endpoint_outage_backoff_max_ms()
);
assert_eq!(
general.me_single_endpoint_shadow_rotate_every_secs,
default_me_single_endpoint_shadow_rotate_every_secs()
);
assert_eq!( assert_eq!(
general.upstream_connect_retry_attempts, general.upstream_connect_retry_attempts,
default_upstream_connect_retry_attempts() default_upstream_connect_retry_attempts()
@ -814,6 +888,49 @@ mod tests {
let _ = std::fs::remove_file(path); let _ = std::fs::remove_file(path);
} }
#[test]
fn me_single_endpoint_outage_backoff_range_is_validated() {
let toml = r#"
[general]
me_single_endpoint_outage_backoff_min_ms = 4000
me_single_endpoint_outage_backoff_max_ms = 3000
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_me_single_endpoint_outage_backoff_range_test.toml");
std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(err.contains(
"general.me_single_endpoint_outage_backoff_min_ms must be <= general.me_single_endpoint_outage_backoff_max_ms"
));
let _ = std::fs::remove_file(path);
}
#[test]
fn me_single_endpoint_shadow_writers_too_large_is_rejected() {
let toml = r#"
[general]
me_single_endpoint_shadow_writers = 33
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_me_single_endpoint_shadow_writers_limit_test.toml");
std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(err.contains("general.me_single_endpoint_shadow_writers must be within [0, 32]"));
let _ = std::fs::remove_file(path);
}
#[test] #[test]
fn upstream_connect_retry_attempts_zero_is_rejected() { fn upstream_connect_retry_attempts_zero_is_rejected() {
let toml = r#" let toml = r#"

View File

@ -394,6 +394,31 @@ pub struct GeneralConfig {
#[serde(default = "default_me_reconnect_fast_retry_count")] #[serde(default = "default_me_reconnect_fast_retry_count")]
pub me_reconnect_fast_retry_count: u32, pub me_reconnect_fast_retry_count: u32,
/// Number of additional reserve writers for DC groups with exactly one endpoint.
#[serde(default = "default_me_single_endpoint_shadow_writers")]
pub me_single_endpoint_shadow_writers: u8,
/// Enable aggressive outage recovery mode for single-endpoint DC groups.
#[serde(default = "default_me_single_endpoint_outage_mode_enabled")]
pub me_single_endpoint_outage_mode_enabled: bool,
/// Ignore endpoint quarantine while in single-endpoint outage mode.
#[serde(default = "default_me_single_endpoint_outage_disable_quarantine")]
pub me_single_endpoint_outage_disable_quarantine: bool,
/// Minimum reconnect backoff in ms for single-endpoint outage mode.
#[serde(default = "default_me_single_endpoint_outage_backoff_min_ms")]
pub me_single_endpoint_outage_backoff_min_ms: u64,
/// Maximum reconnect backoff in ms for single-endpoint outage mode.
#[serde(default = "default_me_single_endpoint_outage_backoff_max_ms")]
pub me_single_endpoint_outage_backoff_max_ms: u64,
/// Periodic shadow writer rotation interval in seconds for single-endpoint DC groups.
/// Set to 0 to disable periodic shadow rotation.
#[serde(default = "default_me_single_endpoint_shadow_rotate_every_secs")]
pub me_single_endpoint_shadow_rotate_every_secs: u64,
/// Connect attempts for the selected upstream before returning error/fallback. /// Connect attempts for the selected upstream before returning error/fallback.
#[serde(default = "default_upstream_connect_retry_attempts")] #[serde(default = "default_upstream_connect_retry_attempts")]
pub upstream_connect_retry_attempts: u32, pub upstream_connect_retry_attempts: u32,
@ -603,6 +628,12 @@ impl Default for GeneralConfig {
me_reconnect_backoff_base_ms: default_reconnect_backoff_base_ms(), me_reconnect_backoff_base_ms: default_reconnect_backoff_base_ms(),
me_reconnect_backoff_cap_ms: default_reconnect_backoff_cap_ms(), me_reconnect_backoff_cap_ms: default_reconnect_backoff_cap_ms(),
me_reconnect_fast_retry_count: default_me_reconnect_fast_retry_count(), me_reconnect_fast_retry_count: default_me_reconnect_fast_retry_count(),
me_single_endpoint_shadow_writers: default_me_single_endpoint_shadow_writers(),
me_single_endpoint_outage_mode_enabled: default_me_single_endpoint_outage_mode_enabled(),
me_single_endpoint_outage_disable_quarantine: default_me_single_endpoint_outage_disable_quarantine(),
me_single_endpoint_outage_backoff_min_ms: default_me_single_endpoint_outage_backoff_min_ms(),
me_single_endpoint_outage_backoff_max_ms: default_me_single_endpoint_outage_backoff_max_ms(),
me_single_endpoint_shadow_rotate_every_secs: default_me_single_endpoint_shadow_rotate_every_secs(),
upstream_connect_retry_attempts: default_upstream_connect_retry_attempts(), upstream_connect_retry_attempts: default_upstream_connect_retry_attempts(),
upstream_connect_retry_backoff_ms: default_upstream_connect_retry_backoff_ms(), upstream_connect_retry_backoff_ms: default_upstream_connect_retry_backoff_ms(),
upstream_unhealthy_fail_threshold: default_upstream_unhealthy_fail_threshold(), upstream_unhealthy_fail_threshold: default_upstream_unhealthy_fail_threshold(),

View File

@ -538,6 +538,12 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
config.general.me_reconnect_backoff_base_ms, config.general.me_reconnect_backoff_base_ms,
config.general.me_reconnect_backoff_cap_ms, config.general.me_reconnect_backoff_cap_ms,
config.general.me_reconnect_fast_retry_count, config.general.me_reconnect_fast_retry_count,
config.general.me_single_endpoint_shadow_writers,
config.general.me_single_endpoint_outage_mode_enabled,
config.general.me_single_endpoint_outage_disable_quarantine,
config.general.me_single_endpoint_outage_backoff_min_ms,
config.general.me_single_endpoint_outage_backoff_max_ms,
config.general.me_single_endpoint_shadow_rotate_every_secs,
config.general.hardswap, config.general.hardswap,
config.general.me_pool_drain_ttl_secs, config.general.me_pool_drain_ttl_secs,
config.general.effective_me_pool_force_close_secs(), config.general.effective_me_pool_force_close_secs(),

View File

@ -479,6 +479,114 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
} }
); );
let _ = writeln!(
out,
"# HELP telemt_me_single_endpoint_outage_enter_total Single-endpoint DC outage transitions to active state"
);
let _ = writeln!(
out,
"# TYPE telemt_me_single_endpoint_outage_enter_total counter"
);
let _ = writeln!(
out,
"telemt_me_single_endpoint_outage_enter_total {}",
if me_allows_normal {
stats.get_me_single_endpoint_outage_enter_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_single_endpoint_outage_exit_total Single-endpoint DC outage recovery transitions"
);
let _ = writeln!(
out,
"# TYPE telemt_me_single_endpoint_outage_exit_total counter"
);
let _ = writeln!(
out,
"telemt_me_single_endpoint_outage_exit_total {}",
if me_allows_normal {
stats.get_me_single_endpoint_outage_exit_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_single_endpoint_outage_reconnect_attempt_total Reconnect attempts performed during single-endpoint outages"
);
let _ = writeln!(
out,
"# TYPE telemt_me_single_endpoint_outage_reconnect_attempt_total counter"
);
let _ = writeln!(
out,
"telemt_me_single_endpoint_outage_reconnect_attempt_total {}",
if me_allows_normal {
stats.get_me_single_endpoint_outage_reconnect_attempt_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_single_endpoint_outage_reconnect_success_total Successful reconnect attempts during single-endpoint outages"
);
let _ = writeln!(
out,
"# TYPE telemt_me_single_endpoint_outage_reconnect_success_total counter"
);
let _ = writeln!(
out,
"telemt_me_single_endpoint_outage_reconnect_success_total {}",
if me_allows_normal {
stats.get_me_single_endpoint_outage_reconnect_success_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_single_endpoint_quarantine_bypass_total Outage reconnect attempts that bypassed quarantine"
);
let _ = writeln!(
out,
"# TYPE telemt_me_single_endpoint_quarantine_bypass_total counter"
);
let _ = writeln!(
out,
"telemt_me_single_endpoint_quarantine_bypass_total {}",
if me_allows_normal {
stats.get_me_single_endpoint_quarantine_bypass_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_single_endpoint_shadow_rotate_total Successful periodic shadow rotations for single-endpoint DC groups"
);
let _ = writeln!(
out,
"# TYPE telemt_me_single_endpoint_shadow_rotate_total counter"
);
let _ = writeln!(
out,
"telemt_me_single_endpoint_shadow_rotate_total {}",
if me_allows_normal {
stats.get_me_single_endpoint_shadow_rotate_total()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_secure_padding_invalid_total Invalid secure frame lengths"); let _ = writeln!(out, "# HELP telemt_secure_padding_invalid_total Invalid secure frame lengths");
let _ = writeln!(out, "# TYPE telemt_secure_padding_invalid_total counter"); let _ = writeln!(out, "# TYPE telemt_secure_padding_invalid_total counter");
let _ = writeln!( let _ = writeln!(

View File

@ -40,6 +40,12 @@ pub struct Stats {
me_kdf_drift_total: AtomicU64, me_kdf_drift_total: AtomicU64,
me_hardswap_pending_reuse_total: AtomicU64, me_hardswap_pending_reuse_total: AtomicU64,
me_hardswap_pending_ttl_expired_total: AtomicU64, me_hardswap_pending_ttl_expired_total: AtomicU64,
me_single_endpoint_outage_enter_total: AtomicU64,
me_single_endpoint_outage_exit_total: AtomicU64,
me_single_endpoint_outage_reconnect_attempt_total: AtomicU64,
me_single_endpoint_outage_reconnect_success_total: AtomicU64,
me_single_endpoint_quarantine_bypass_total: AtomicU64,
me_single_endpoint_shadow_rotate_total: AtomicU64,
me_handshake_error_codes: DashMap<i32, AtomicU64>, me_handshake_error_codes: DashMap<i32, AtomicU64>,
me_route_drop_no_conn: AtomicU64, me_route_drop_no_conn: AtomicU64,
me_route_drop_channel_closed: AtomicU64, me_route_drop_channel_closed: AtomicU64,
@ -383,6 +389,42 @@ impl Stats {
.fetch_add(1, Ordering::Relaxed); .fetch_add(1, Ordering::Relaxed);
} }
} }
pub fn increment_me_single_endpoint_outage_enter_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_single_endpoint_outage_enter_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_single_endpoint_outage_exit_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_single_endpoint_outage_exit_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_single_endpoint_outage_reconnect_attempt_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_single_endpoint_outage_reconnect_attempt_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_single_endpoint_outage_reconnect_success_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_single_endpoint_outage_reconnect_success_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_single_endpoint_quarantine_bypass_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_single_endpoint_quarantine_bypass_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_single_endpoint_shadow_rotate_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_single_endpoint_shadow_rotate_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) } pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) }
pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) } pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) }
pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) } pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) }
@ -413,6 +455,30 @@ impl Stats {
self.me_hardswap_pending_ttl_expired_total self.me_hardswap_pending_ttl_expired_total
.load(Ordering::Relaxed) .load(Ordering::Relaxed)
} }
pub fn get_me_single_endpoint_outage_enter_total(&self) -> u64 {
self.me_single_endpoint_outage_enter_total
.load(Ordering::Relaxed)
}
pub fn get_me_single_endpoint_outage_exit_total(&self) -> u64 {
self.me_single_endpoint_outage_exit_total
.load(Ordering::Relaxed)
}
pub fn get_me_single_endpoint_outage_reconnect_attempt_total(&self) -> u64 {
self.me_single_endpoint_outage_reconnect_attempt_total
.load(Ordering::Relaxed)
}
pub fn get_me_single_endpoint_outage_reconnect_success_total(&self) -> u64 {
self.me_single_endpoint_outage_reconnect_success_total
.load(Ordering::Relaxed)
}
pub fn get_me_single_endpoint_quarantine_bypass_total(&self) -> u64 {
self.me_single_endpoint_quarantine_bypass_total
.load(Ordering::Relaxed)
}
pub fn get_me_single_endpoint_shadow_rotate_total(&self) -> u64 {
self.me_single_endpoint_shadow_rotate_total
.load(Ordering::Relaxed)
}
pub fn get_me_handshake_error_code_counts(&self) -> Vec<(i32, u64)> { pub fn get_me_handshake_error_code_counts(&self) -> Vec<(i32, u64)> {
let mut out: Vec<(i32, u64)> = self let mut out: Vec<(i32, u64)> = self
.me_handshake_error_codes .me_handshake_error_codes

View File

@ -276,6 +276,12 @@ async fn run_update_cycle(
cfg.general.me_bind_stale_ttl_secs, cfg.general.me_bind_stale_ttl_secs,
cfg.general.me_secret_atomic_snapshot, cfg.general.me_secret_atomic_snapshot,
cfg.general.me_deterministic_writer_sort, cfg.general.me_deterministic_writer_sort,
cfg.general.me_single_endpoint_shadow_writers,
cfg.general.me_single_endpoint_outage_mode_enabled,
cfg.general.me_single_endpoint_outage_disable_quarantine,
cfg.general.me_single_endpoint_outage_backoff_min_ms,
cfg.general.me_single_endpoint_outage_backoff_max_ms,
cfg.general.me_single_endpoint_shadow_rotate_every_secs,
); );
let required_cfg_snapshots = cfg.general.me_config_stable_snapshots.max(1); let required_cfg_snapshots = cfg.general.me_config_stable_snapshots.max(1);
@ -478,6 +484,12 @@ pub async fn me_config_updater(
cfg.general.me_bind_stale_ttl_secs, cfg.general.me_bind_stale_ttl_secs,
cfg.general.me_secret_atomic_snapshot, cfg.general.me_secret_atomic_snapshot,
cfg.general.me_deterministic_writer_sort, cfg.general.me_deterministic_writer_sort,
cfg.general.me_single_endpoint_shadow_writers,
cfg.general.me_single_endpoint_outage_mode_enabled,
cfg.general.me_single_endpoint_outage_disable_quarantine,
cfg.general.me_single_endpoint_outage_backoff_min_ms,
cfg.general.me_single_endpoint_outage_backoff_max_ms,
cfg.general.me_single_endpoint_shadow_rotate_every_secs,
); );
let new_secs = cfg.general.effective_update_every_secs().max(1); let new_secs = cfg.general.effective_update_every_secs().max(1);
if new_secs == update_every_secs { if new_secs == update_every_secs {

View File

@ -1,10 +1,11 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::collections::HashSet;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tracing::{debug, info, warn};
use rand::Rng; use rand::Rng;
use tracing::{debug, info, warn};
use crate::crypto::SecureRandom; use crate::crypto::SecureRandom;
use crate::network::IpFamily; use crate::network::IpFamily;
@ -15,11 +16,16 @@ const HEALTH_INTERVAL_SECS: u64 = 1;
const JITTER_FRAC_NUM: u64 = 2; // jitter up to 50% of backoff const JITTER_FRAC_NUM: u64 = 2; // jitter up to 50% of backoff
#[allow(dead_code)] #[allow(dead_code)]
const MAX_CONCURRENT_PER_DC_DEFAULT: usize = 1; const MAX_CONCURRENT_PER_DC_DEFAULT: usize = 1;
const SHADOW_ROTATE_RETRY_SECS: u64 = 30;
pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_connections: usize) { pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_connections: usize) {
let mut backoff: HashMap<(i32, IpFamily), u64> = HashMap::new(); let mut backoff: HashMap<(i32, IpFamily), u64> = HashMap::new();
let mut next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut inflight: HashMap<(i32, IpFamily), usize> = HashMap::new(); let mut inflight: HashMap<(i32, IpFamily), usize> = HashMap::new();
let mut outage_backoff: HashMap<(i32, IpFamily), u64> = HashMap::new();
let mut outage_next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut single_endpoint_outage: HashSet<(i32, IpFamily)> = HashSet::new();
let mut shadow_rotate_deadline: HashMap<(i32, IpFamily), Instant> = HashMap::new();
loop { loop {
tokio::time::sleep(Duration::from_secs(HEALTH_INTERVAL_SECS)).await; tokio::time::sleep(Duration::from_secs(HEALTH_INTERVAL_SECS)).await;
pool.prune_closed_writers().await; pool.prune_closed_writers().await;
@ -30,6 +36,10 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
&mut backoff, &mut backoff,
&mut next_attempt, &mut next_attempt,
&mut inflight, &mut inflight,
&mut outage_backoff,
&mut outage_next_attempt,
&mut single_endpoint_outage,
&mut shadow_rotate_deadline,
) )
.await; .await;
check_family( check_family(
@ -39,6 +49,10 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
&mut backoff, &mut backoff,
&mut next_attempt, &mut next_attempt,
&mut inflight, &mut inflight,
&mut outage_backoff,
&mut outage_next_attempt,
&mut single_endpoint_outage,
&mut shadow_rotate_deadline,
) )
.await; .await;
} }
@ -51,6 +65,10 @@ async fn check_family(
backoff: &mut HashMap<(i32, IpFamily), u64>, backoff: &mut HashMap<(i32, IpFamily), u64>,
next_attempt: &mut HashMap<(i32, IpFamily), Instant>, next_attempt: &mut HashMap<(i32, IpFamily), Instant>,
inflight: &mut HashMap<(i32, IpFamily), usize>, inflight: &mut HashMap<(i32, IpFamily), usize>,
outage_backoff: &mut HashMap<(i32, IpFamily), u64>,
outage_next_attempt: &mut HashMap<(i32, IpFamily), Instant>,
single_endpoint_outage: &mut HashSet<(i32, IpFamily)>,
shadow_rotate_deadline: &mut HashMap<(i32, IpFamily), Instant>,
) { ) {
let enabled = match family { let enabled = match family {
IpFamily::V4 => pool.decision.ipv4_me, IpFamily::V4 => pool.decision.ipv4_me,
@ -78,31 +96,86 @@ async fn check_family(
} }
let mut live_addr_counts = HashMap::<SocketAddr, usize>::new(); let mut live_addr_counts = HashMap::<SocketAddr, usize>::new();
for writer in pool let mut live_writer_ids_by_addr = HashMap::<SocketAddr, Vec<u64>>::new();
.writers for writer in pool.writers.read().await.iter().filter(|w| {
.read() !w.draining.load(std::sync::atomic::Ordering::Relaxed)
.await }) {
.iter()
.filter(|w| !w.draining.load(std::sync::atomic::Ordering::Relaxed))
{
*live_addr_counts.entry(writer.addr).or_insert(0) += 1; *live_addr_counts.entry(writer.addr).or_insert(0) += 1;
live_writer_ids_by_addr
.entry(writer.addr)
.or_default()
.push(writer.id);
} }
for (dc, endpoints) in dc_endpoints { for (dc, endpoints) in dc_endpoints {
if endpoints.is_empty() { if endpoints.is_empty() {
continue; continue;
} }
let required = MePool::required_writers_for_dc(endpoints.len()); let required = pool.required_writers_for_dc(endpoints.len());
let alive = endpoints let alive = endpoints
.iter() .iter()
.map(|addr| *live_addr_counts.get(addr).unwrap_or(&0)) .map(|addr| *live_addr_counts.get(addr).unwrap_or(&0))
.sum::<usize>(); .sum::<usize>();
let key = (dc, family);
if endpoints.len() == 1 && pool.single_endpoint_outage_mode_enabled() && alive == 0 {
if single_endpoint_outage.insert(key) {
pool.stats.increment_me_single_endpoint_outage_enter_total();
warn!(
dc = %dc,
?family,
required,
endpoint_count = endpoints.len(),
"Single-endpoint DC outage detected"
);
}
recover_single_endpoint_outage(
pool,
rng,
key,
endpoints[0],
required,
outage_backoff,
outage_next_attempt,
)
.await;
continue;
}
if single_endpoint_outage.remove(&key) {
pool.stats.increment_me_single_endpoint_outage_exit_total();
outage_backoff.remove(&key);
outage_next_attempt.remove(&key);
shadow_rotate_deadline.remove(&key);
info!(
dc = %dc,
?family,
alive,
required,
endpoint_count = endpoints.len(),
"Single-endpoint DC outage recovered"
);
}
if alive >= required { if alive >= required {
maybe_rotate_single_endpoint_shadow(
pool,
rng,
key,
dc,
family,
&endpoints,
alive,
required,
&live_writer_ids_by_addr,
shadow_rotate_deadline,
)
.await;
continue; continue;
} }
let missing = required - alive; let missing = required - alive;
let key = (dc, family);
let now = Instant::now(); let now = Instant::now();
if let Some(ts) = next_attempt.get(&key) if let Some(ts) = next_attempt.get(&key)
&& now < *ts && now < *ts
@ -188,3 +261,207 @@ async fn check_family(
} }
} }
} }
async fn recover_single_endpoint_outage(
pool: &Arc<MePool>,
rng: &Arc<SecureRandom>,
key: (i32, IpFamily),
endpoint: SocketAddr,
required: usize,
outage_backoff: &mut HashMap<(i32, IpFamily), u64>,
outage_next_attempt: &mut HashMap<(i32, IpFamily), Instant>,
) {
let now = Instant::now();
if let Some(ts) = outage_next_attempt.get(&key)
&& now < *ts
{
return;
}
let (min_backoff_ms, max_backoff_ms) = pool.single_endpoint_outage_backoff_bounds_ms();
pool.stats
.increment_me_single_endpoint_outage_reconnect_attempt_total();
let bypass_quarantine = pool.single_endpoint_outage_disable_quarantine();
let attempt_ok = if bypass_quarantine {
pool.stats
.increment_me_single_endpoint_quarantine_bypass_total();
match tokio::time::timeout(pool.me_one_timeout, pool.connect_one(endpoint, rng.as_ref())).await {
Ok(Ok(())) => true,
Ok(Err(e)) => {
debug!(
dc = %key.0,
family = ?key.1,
%endpoint,
error = %e,
"Single-endpoint outage reconnect failed (quarantine bypass path)"
);
false
}
Err(_) => {
debug!(
dc = %key.0,
family = ?key.1,
%endpoint,
"Single-endpoint outage reconnect timed out (quarantine bypass path)"
);
false
}
}
} else {
let one_endpoint = [endpoint];
match tokio::time::timeout(
pool.me_one_timeout,
pool.connect_endpoints_round_robin(&one_endpoint, rng.as_ref()),
)
.await
{
Ok(ok) => ok,
Err(_) => {
debug!(
dc = %key.0,
family = ?key.1,
%endpoint,
"Single-endpoint outage reconnect timed out"
);
false
}
}
};
if attempt_ok {
pool.stats
.increment_me_single_endpoint_outage_reconnect_success_total();
pool.stats.increment_me_reconnect_success();
outage_backoff.insert(key, min_backoff_ms);
let jitter = min_backoff_ms / JITTER_FRAC_NUM;
let wait = Duration::from_millis(min_backoff_ms)
+ Duration::from_millis(rand::rng().random_range(0..=jitter.max(1)));
outage_next_attempt.insert(key, now + wait);
info!(
dc = %key.0,
family = ?key.1,
%endpoint,
required,
backoff_ms = min_backoff_ms,
"Single-endpoint outage reconnect succeeded"
);
return;
}
pool.stats.increment_me_reconnect_attempt();
let current_ms = *outage_backoff.get(&key).unwrap_or(&min_backoff_ms);
let next_ms = current_ms.saturating_mul(2).min(max_backoff_ms);
outage_backoff.insert(key, next_ms);
let jitter = next_ms / JITTER_FRAC_NUM;
let wait = Duration::from_millis(next_ms)
+ Duration::from_millis(rand::rng().random_range(0..=jitter.max(1)));
outage_next_attempt.insert(key, now + wait);
warn!(
dc = %key.0,
family = ?key.1,
%endpoint,
required,
backoff_ms = next_ms,
"Single-endpoint outage reconnect scheduled"
);
}
async fn maybe_rotate_single_endpoint_shadow(
pool: &Arc<MePool>,
rng: &Arc<SecureRandom>,
key: (i32, IpFamily),
dc: i32,
family: IpFamily,
endpoints: &[SocketAddr],
alive: usize,
required: usize,
live_writer_ids_by_addr: &HashMap<SocketAddr, Vec<u64>>,
shadow_rotate_deadline: &mut HashMap<(i32, IpFamily), Instant>,
) {
if endpoints.len() != 1 || alive < required {
return;
}
let Some(interval) = pool.single_endpoint_shadow_rotate_interval() else {
return;
};
let now = Instant::now();
if let Some(deadline) = shadow_rotate_deadline.get(&key)
&& now < *deadline
{
return;
}
let endpoint = endpoints[0];
let Some(writer_ids) = live_writer_ids_by_addr.get(&endpoint) else {
shadow_rotate_deadline.insert(key, now + Duration::from_secs(SHADOW_ROTATE_RETRY_SECS));
return;
};
let mut candidate_writer_id = None;
for writer_id in writer_ids {
if pool.registry.is_writer_empty(*writer_id).await {
candidate_writer_id = Some(*writer_id);
break;
}
}
let Some(old_writer_id) = candidate_writer_id else {
shadow_rotate_deadline.insert(key, now + Duration::from_secs(SHADOW_ROTATE_RETRY_SECS));
debug!(
dc = %dc,
?family,
%endpoint,
alive,
required,
"Single-endpoint shadow rotation skipped: no empty writer candidate"
);
return;
};
let rotate_ok = match tokio::time::timeout(pool.me_one_timeout, pool.connect_one(endpoint, rng.as_ref())).await {
Ok(Ok(())) => true,
Ok(Err(e)) => {
debug!(
dc = %dc,
?family,
%endpoint,
error = %e,
"Single-endpoint shadow rotation connect failed"
);
false
}
Err(_) => {
debug!(
dc = %dc,
?family,
%endpoint,
"Single-endpoint shadow rotation connect timed out"
);
false
}
};
if !rotate_ok {
shadow_rotate_deadline.insert(
key,
now + interval.min(Duration::from_secs(SHADOW_ROTATE_RETRY_SECS)),
);
return;
}
pool.mark_writer_draining_with_timeout(old_writer_id, pool.force_close_timeout(), false)
.await;
pool.stats.increment_me_single_endpoint_shadow_rotate_total();
shadow_rotate_deadline.insert(key, now + interval);
info!(
dc = %dc,
?family,
%endpoint,
old_writer_id,
rotate_every_secs = interval.as_secs(),
"Single-endpoint shadow writer rotated"
);
}

View File

@ -101,6 +101,12 @@ pub struct MePool {
pub(super) me_reconnect_backoff_base: Duration, pub(super) me_reconnect_backoff_base: Duration,
pub(super) me_reconnect_backoff_cap: Duration, pub(super) me_reconnect_backoff_cap: Duration,
pub(super) me_reconnect_fast_retry_count: u32, pub(super) me_reconnect_fast_retry_count: u32,
pub(super) me_single_endpoint_shadow_writers: AtomicU8,
pub(super) me_single_endpoint_outage_mode_enabled: AtomicBool,
pub(super) me_single_endpoint_outage_disable_quarantine: AtomicBool,
pub(super) me_single_endpoint_outage_backoff_min_ms: AtomicU64,
pub(super) me_single_endpoint_outage_backoff_max_ms: AtomicU64,
pub(super) me_single_endpoint_shadow_rotate_every_secs: AtomicU64,
pub(super) proxy_map_v4: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>, pub(super) proxy_map_v4: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
pub(super) proxy_map_v6: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>, pub(super) proxy_map_v6: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
pub(super) default_dc: AtomicI32, pub(super) default_dc: AtomicI32,
@ -189,6 +195,12 @@ impl MePool {
me_reconnect_backoff_base_ms: u64, me_reconnect_backoff_base_ms: u64,
me_reconnect_backoff_cap_ms: u64, me_reconnect_backoff_cap_ms: u64,
me_reconnect_fast_retry_count: u32, me_reconnect_fast_retry_count: u32,
me_single_endpoint_shadow_writers: u8,
me_single_endpoint_outage_mode_enabled: bool,
me_single_endpoint_outage_disable_quarantine: bool,
me_single_endpoint_outage_backoff_min_ms: u64,
me_single_endpoint_outage_backoff_max_ms: u64,
me_single_endpoint_shadow_rotate_every_secs: u64,
hardswap: bool, hardswap: bool,
me_pool_drain_ttl_secs: u64, me_pool_drain_ttl_secs: u64,
me_pool_force_close_secs: u64, me_pool_force_close_secs: u64,
@ -259,6 +271,22 @@ impl MePool {
me_reconnect_backoff_base: Duration::from_millis(me_reconnect_backoff_base_ms), me_reconnect_backoff_base: Duration::from_millis(me_reconnect_backoff_base_ms),
me_reconnect_backoff_cap: Duration::from_millis(me_reconnect_backoff_cap_ms), me_reconnect_backoff_cap: Duration::from_millis(me_reconnect_backoff_cap_ms),
me_reconnect_fast_retry_count, me_reconnect_fast_retry_count,
me_single_endpoint_shadow_writers: AtomicU8::new(me_single_endpoint_shadow_writers),
me_single_endpoint_outage_mode_enabled: AtomicBool::new(
me_single_endpoint_outage_mode_enabled,
),
me_single_endpoint_outage_disable_quarantine: AtomicBool::new(
me_single_endpoint_outage_disable_quarantine,
),
me_single_endpoint_outage_backoff_min_ms: AtomicU64::new(
me_single_endpoint_outage_backoff_min_ms,
),
me_single_endpoint_outage_backoff_max_ms: AtomicU64::new(
me_single_endpoint_outage_backoff_max_ms,
),
me_single_endpoint_shadow_rotate_every_secs: AtomicU64::new(
me_single_endpoint_shadow_rotate_every_secs,
),
pool_size: 2, pool_size: 2,
proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)), proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)),
proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)), proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)),
@ -317,6 +345,12 @@ impl MePool {
bind_stale_ttl_secs: u64, bind_stale_ttl_secs: u64,
secret_atomic_snapshot: bool, secret_atomic_snapshot: bool,
deterministic_writer_sort: bool, deterministic_writer_sort: bool,
single_endpoint_shadow_writers: u8,
single_endpoint_outage_mode_enabled: bool,
single_endpoint_outage_disable_quarantine: bool,
single_endpoint_outage_backoff_min_ms: u64,
single_endpoint_outage_backoff_max_ms: u64,
single_endpoint_shadow_rotate_every_secs: u64,
) { ) {
self.hardswap.store(hardswap, Ordering::Relaxed); self.hardswap.store(hardswap, Ordering::Relaxed);
self.me_pool_drain_ttl_secs self.me_pool_drain_ttl_secs
@ -341,6 +375,18 @@ impl MePool {
.store(secret_atomic_snapshot, Ordering::Relaxed); .store(secret_atomic_snapshot, Ordering::Relaxed);
self.me_deterministic_writer_sort self.me_deterministic_writer_sort
.store(deterministic_writer_sort, Ordering::Relaxed); .store(deterministic_writer_sort, Ordering::Relaxed);
self.me_single_endpoint_shadow_writers
.store(single_endpoint_shadow_writers, Ordering::Relaxed);
self.me_single_endpoint_outage_mode_enabled
.store(single_endpoint_outage_mode_enabled, Ordering::Relaxed);
self.me_single_endpoint_outage_disable_quarantine
.store(single_endpoint_outage_disable_quarantine, Ordering::Relaxed);
self.me_single_endpoint_outage_backoff_min_ms
.store(single_endpoint_outage_backoff_min_ms, Ordering::Relaxed);
self.me_single_endpoint_outage_backoff_max_ms
.store(single_endpoint_outage_backoff_max_ms, Ordering::Relaxed);
self.me_single_endpoint_shadow_rotate_every_secs
.store(single_endpoint_shadow_rotate_every_secs, Ordering::Relaxed);
} }
pub fn reset_stun_state(&self) { pub fn reset_stun_state(&self) {
@ -405,6 +451,54 @@ impl MePool {
MeBindStaleMode::from_u8(self.me_bind_stale_mode.load(Ordering::Relaxed)) MeBindStaleMode::from_u8(self.me_bind_stale_mode.load(Ordering::Relaxed))
} }
pub(super) fn required_writers_for_dc(&self, endpoint_count: usize) -> usize {
if endpoint_count == 0 {
return 0;
}
if endpoint_count == 1 {
let shadow = self
.me_single_endpoint_shadow_writers
.load(Ordering::Relaxed) as usize;
return (1 + shadow).max(3);
}
endpoint_count.max(3)
}
pub(super) fn single_endpoint_outage_mode_enabled(&self) -> bool {
self.me_single_endpoint_outage_mode_enabled
.load(Ordering::Relaxed)
}
pub(super) fn single_endpoint_outage_disable_quarantine(&self) -> bool {
self.me_single_endpoint_outage_disable_quarantine
.load(Ordering::Relaxed)
}
pub(super) fn single_endpoint_outage_backoff_bounds_ms(&self) -> (u64, u64) {
let min_ms = self
.me_single_endpoint_outage_backoff_min_ms
.load(Ordering::Relaxed);
let max_ms = self
.me_single_endpoint_outage_backoff_max_ms
.load(Ordering::Relaxed);
if min_ms <= max_ms {
(min_ms, max_ms)
} else {
(max_ms, min_ms)
}
}
pub(super) fn single_endpoint_shadow_rotate_interval(&self) -> Option<Duration> {
let secs = self
.me_single_endpoint_shadow_rotate_every_secs
.load(Ordering::Relaxed);
if secs == 0 {
None
} else {
Some(Duration::from_secs(secs))
}
}
pub(super) fn family_order(&self) -> Vec<IpFamily> { pub(super) fn family_order(&self) -> Vec<IpFamily> {
let mut order = Vec::new(); let mut order = Vec::new();
if self.decision.prefer_ipv6() { if self.decision.prefer_ipv6() {

View File

@ -148,10 +148,6 @@ impl MePool {
out out
} }
pub(super) fn required_writers_for_dc(endpoint_count: usize) -> usize {
endpoint_count.max(3)
}
fn hardswap_warmup_connect_delay_ms(&self) -> u64 { fn hardswap_warmup_connect_delay_ms(&self) -> u64 {
let min_ms = self.me_hardswap_warmup_delay_min_ms.load(Ordering::Relaxed); let min_ms = self.me_hardswap_warmup_delay_min_ms.load(Ordering::Relaxed);
let max_ms = self.me_hardswap_warmup_delay_max_ms.load(Ordering::Relaxed); let max_ms = self.me_hardswap_warmup_delay_max_ms.load(Ordering::Relaxed);
@ -221,7 +217,7 @@ impl MePool {
let mut endpoint_list: Vec<SocketAddr> = endpoints.iter().copied().collect(); let mut endpoint_list: Vec<SocketAddr> = endpoints.iter().copied().collect();
endpoint_list.sort_unstable(); endpoint_list.sort_unstable();
let required = Self::required_writers_for_dc(endpoint_list.len()); let required = self.required_writers_for_dc(endpoint_list.len());
let mut completed = false; let mut completed = false;
let mut last_fresh_count = self let mut last_fresh_count = self
.fresh_writer_count_for_endpoints(generation, endpoints) .fresh_writer_count_for_endpoints(generation, endpoints)
@ -409,7 +405,7 @@ impl MePool {
if endpoints.is_empty() { if endpoints.is_empty() {
continue; continue;
} }
let required = Self::required_writers_for_dc(endpoints.len()); let required = self.required_writers_for_dc(endpoints.len());
let fresh_count = writers let fresh_count = writers
.iter() .iter()
.filter(|w| !w.draining.load(Ordering::Relaxed)) .filter(|w| !w.draining.load(Ordering::Relaxed))