mirror of https://github.com/telemt/telemt.git
ME Dual-Trio Pool + ME Pool Shadow Writers: merge pull request #295 from telemt/flow-drift
ME Dual-Trio Pool + ME Pool Shadow Writers
This commit is contained in:
commit
1a68dc1c2d
|
|
@ -8,6 +8,7 @@ const DEFAULT_STUN_TCP_FALLBACK: bool = true;
|
||||||
const DEFAULT_MIDDLE_PROXY_WARM_STANDBY: usize = 16;
|
const DEFAULT_MIDDLE_PROXY_WARM_STANDBY: usize = 16;
|
||||||
const DEFAULT_ME_RECONNECT_MAX_CONCURRENT_PER_DC: u32 = 8;
|
const DEFAULT_ME_RECONNECT_MAX_CONCURRENT_PER_DC: u32 = 8;
|
||||||
const DEFAULT_ME_RECONNECT_FAST_RETRY_COUNT: u32 = 16;
|
const DEFAULT_ME_RECONNECT_FAST_RETRY_COUNT: u32 = 16;
|
||||||
|
const DEFAULT_ME_SINGLE_ENDPOINT_SHADOW_WRITERS: u8 = 2;
|
||||||
const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 3;
|
const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 3;
|
||||||
const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 4;
|
const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 4;
|
||||||
const DEFAULT_LISTEN_ADDR_IPV6: &str = "::";
|
const DEFAULT_LISTEN_ADDR_IPV6: &str = "::";
|
||||||
|
|
@ -160,6 +161,30 @@ pub(crate) fn default_me_reconnect_fast_retry_count() -> u32 {
|
||||||
DEFAULT_ME_RECONNECT_FAST_RETRY_COUNT
|
DEFAULT_ME_RECONNECT_FAST_RETRY_COUNT
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_me_single_endpoint_shadow_writers() -> u8 {
|
||||||
|
DEFAULT_ME_SINGLE_ENDPOINT_SHADOW_WRITERS
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_me_single_endpoint_outage_mode_enabled() -> bool {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_me_single_endpoint_outage_disable_quarantine() -> bool {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_me_single_endpoint_outage_backoff_min_ms() -> u64 {
|
||||||
|
250
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_me_single_endpoint_outage_backoff_max_ms() -> u64 {
|
||||||
|
3000
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_me_single_endpoint_shadow_rotate_every_secs() -> u64 {
|
||||||
|
900
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn default_upstream_connect_retry_attempts() -> u32 {
|
pub(crate) fn default_upstream_connect_retry_attempts() -> u32 {
|
||||||
DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS
|
DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -255,6 +255,32 @@ impl ProxyConfig {
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if config.general.me_single_endpoint_shadow_writers > 32 {
|
||||||
|
return Err(ProxyError::Config(
|
||||||
|
"general.me_single_endpoint_shadow_writers must be within [0, 32]".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.general.me_single_endpoint_outage_backoff_min_ms == 0 {
|
||||||
|
return Err(ProxyError::Config(
|
||||||
|
"general.me_single_endpoint_outage_backoff_min_ms must be > 0".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.general.me_single_endpoint_outage_backoff_max_ms == 0 {
|
||||||
|
return Err(ProxyError::Config(
|
||||||
|
"general.me_single_endpoint_outage_backoff_max_ms must be > 0".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.general.me_single_endpoint_outage_backoff_min_ms
|
||||||
|
> config.general.me_single_endpoint_outage_backoff_max_ms
|
||||||
|
{
|
||||||
|
return Err(ProxyError::Config(
|
||||||
|
"general.me_single_endpoint_outage_backoff_min_ms must be <= general.me_single_endpoint_outage_backoff_max_ms".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
if config.general.beobachten_minutes == 0 {
|
if config.general.beobachten_minutes == 0 {
|
||||||
return Err(ProxyError::Config(
|
return Err(ProxyError::Config(
|
||||||
"general.beobachten_minutes must be > 0".to_string(),
|
"general.beobachten_minutes must be > 0".to_string(),
|
||||||
|
|
@ -592,6 +618,30 @@ mod tests {
|
||||||
cfg.general.me_reconnect_fast_retry_count,
|
cfg.general.me_reconnect_fast_retry_count,
|
||||||
default_me_reconnect_fast_retry_count()
|
default_me_reconnect_fast_retry_count()
|
||||||
);
|
);
|
||||||
|
assert_eq!(
|
||||||
|
cfg.general.me_single_endpoint_shadow_writers,
|
||||||
|
default_me_single_endpoint_shadow_writers()
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
cfg.general.me_single_endpoint_outage_mode_enabled,
|
||||||
|
default_me_single_endpoint_outage_mode_enabled()
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
cfg.general.me_single_endpoint_outage_disable_quarantine,
|
||||||
|
default_me_single_endpoint_outage_disable_quarantine()
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
cfg.general.me_single_endpoint_outage_backoff_min_ms,
|
||||||
|
default_me_single_endpoint_outage_backoff_min_ms()
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
cfg.general.me_single_endpoint_outage_backoff_max_ms,
|
||||||
|
default_me_single_endpoint_outage_backoff_max_ms()
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
cfg.general.me_single_endpoint_shadow_rotate_every_secs,
|
||||||
|
default_me_single_endpoint_shadow_rotate_every_secs()
|
||||||
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
cfg.general.upstream_connect_retry_attempts,
|
cfg.general.upstream_connect_retry_attempts,
|
||||||
default_upstream_connect_retry_attempts()
|
default_upstream_connect_retry_attempts()
|
||||||
|
|
@ -630,6 +680,30 @@ mod tests {
|
||||||
general.me_reconnect_fast_retry_count,
|
general.me_reconnect_fast_retry_count,
|
||||||
default_me_reconnect_fast_retry_count()
|
default_me_reconnect_fast_retry_count()
|
||||||
);
|
);
|
||||||
|
assert_eq!(
|
||||||
|
general.me_single_endpoint_shadow_writers,
|
||||||
|
default_me_single_endpoint_shadow_writers()
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
general.me_single_endpoint_outage_mode_enabled,
|
||||||
|
default_me_single_endpoint_outage_mode_enabled()
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
general.me_single_endpoint_outage_disable_quarantine,
|
||||||
|
default_me_single_endpoint_outage_disable_quarantine()
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
general.me_single_endpoint_outage_backoff_min_ms,
|
||||||
|
default_me_single_endpoint_outage_backoff_min_ms()
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
general.me_single_endpoint_outage_backoff_max_ms,
|
||||||
|
default_me_single_endpoint_outage_backoff_max_ms()
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
general.me_single_endpoint_shadow_rotate_every_secs,
|
||||||
|
default_me_single_endpoint_shadow_rotate_every_secs()
|
||||||
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
general.upstream_connect_retry_attempts,
|
general.upstream_connect_retry_attempts,
|
||||||
default_upstream_connect_retry_attempts()
|
default_upstream_connect_retry_attempts()
|
||||||
|
|
@ -814,6 +888,49 @@ mod tests {
|
||||||
let _ = std::fs::remove_file(path);
|
let _ = std::fs::remove_file(path);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn me_single_endpoint_outage_backoff_range_is_validated() {
|
||||||
|
let toml = r#"
|
||||||
|
[general]
|
||||||
|
me_single_endpoint_outage_backoff_min_ms = 4000
|
||||||
|
me_single_endpoint_outage_backoff_max_ms = 3000
|
||||||
|
|
||||||
|
[censorship]
|
||||||
|
tls_domain = "example.com"
|
||||||
|
|
||||||
|
[access.users]
|
||||||
|
user = "00000000000000000000000000000000"
|
||||||
|
"#;
|
||||||
|
let dir = std::env::temp_dir();
|
||||||
|
let path = dir.join("telemt_me_single_endpoint_outage_backoff_range_test.toml");
|
||||||
|
std::fs::write(&path, toml).unwrap();
|
||||||
|
let err = ProxyConfig::load(&path).unwrap_err().to_string();
|
||||||
|
assert!(err.contains(
|
||||||
|
"general.me_single_endpoint_outage_backoff_min_ms must be <= general.me_single_endpoint_outage_backoff_max_ms"
|
||||||
|
));
|
||||||
|
let _ = std::fs::remove_file(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn me_single_endpoint_shadow_writers_too_large_is_rejected() {
|
||||||
|
let toml = r#"
|
||||||
|
[general]
|
||||||
|
me_single_endpoint_shadow_writers = 33
|
||||||
|
|
||||||
|
[censorship]
|
||||||
|
tls_domain = "example.com"
|
||||||
|
|
||||||
|
[access.users]
|
||||||
|
user = "00000000000000000000000000000000"
|
||||||
|
"#;
|
||||||
|
let dir = std::env::temp_dir();
|
||||||
|
let path = dir.join("telemt_me_single_endpoint_shadow_writers_limit_test.toml");
|
||||||
|
std::fs::write(&path, toml).unwrap();
|
||||||
|
let err = ProxyConfig::load(&path).unwrap_err().to_string();
|
||||||
|
assert!(err.contains("general.me_single_endpoint_shadow_writers must be within [0, 32]"));
|
||||||
|
let _ = std::fs::remove_file(path);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn upstream_connect_retry_attempts_zero_is_rejected() {
|
fn upstream_connect_retry_attempts_zero_is_rejected() {
|
||||||
let toml = r#"
|
let toml = r#"
|
||||||
|
|
|
||||||
|
|
@ -394,6 +394,31 @@ pub struct GeneralConfig {
|
||||||
#[serde(default = "default_me_reconnect_fast_retry_count")]
|
#[serde(default = "default_me_reconnect_fast_retry_count")]
|
||||||
pub me_reconnect_fast_retry_count: u32,
|
pub me_reconnect_fast_retry_count: u32,
|
||||||
|
|
||||||
|
/// Number of additional reserve writers for DC groups with exactly one endpoint.
|
||||||
|
#[serde(default = "default_me_single_endpoint_shadow_writers")]
|
||||||
|
pub me_single_endpoint_shadow_writers: u8,
|
||||||
|
|
||||||
|
/// Enable aggressive outage recovery mode for single-endpoint DC groups.
|
||||||
|
#[serde(default = "default_me_single_endpoint_outage_mode_enabled")]
|
||||||
|
pub me_single_endpoint_outage_mode_enabled: bool,
|
||||||
|
|
||||||
|
/// Ignore endpoint quarantine while in single-endpoint outage mode.
|
||||||
|
#[serde(default = "default_me_single_endpoint_outage_disable_quarantine")]
|
||||||
|
pub me_single_endpoint_outage_disable_quarantine: bool,
|
||||||
|
|
||||||
|
/// Minimum reconnect backoff in ms for single-endpoint outage mode.
|
||||||
|
#[serde(default = "default_me_single_endpoint_outage_backoff_min_ms")]
|
||||||
|
pub me_single_endpoint_outage_backoff_min_ms: u64,
|
||||||
|
|
||||||
|
/// Maximum reconnect backoff in ms for single-endpoint outage mode.
|
||||||
|
#[serde(default = "default_me_single_endpoint_outage_backoff_max_ms")]
|
||||||
|
pub me_single_endpoint_outage_backoff_max_ms: u64,
|
||||||
|
|
||||||
|
/// Periodic shadow writer rotation interval in seconds for single-endpoint DC groups.
|
||||||
|
/// Set to 0 to disable periodic shadow rotation.
|
||||||
|
#[serde(default = "default_me_single_endpoint_shadow_rotate_every_secs")]
|
||||||
|
pub me_single_endpoint_shadow_rotate_every_secs: u64,
|
||||||
|
|
||||||
/// Connect attempts for the selected upstream before returning error/fallback.
|
/// Connect attempts for the selected upstream before returning error/fallback.
|
||||||
#[serde(default = "default_upstream_connect_retry_attempts")]
|
#[serde(default = "default_upstream_connect_retry_attempts")]
|
||||||
pub upstream_connect_retry_attempts: u32,
|
pub upstream_connect_retry_attempts: u32,
|
||||||
|
|
@ -603,6 +628,12 @@ impl Default for GeneralConfig {
|
||||||
me_reconnect_backoff_base_ms: default_reconnect_backoff_base_ms(),
|
me_reconnect_backoff_base_ms: default_reconnect_backoff_base_ms(),
|
||||||
me_reconnect_backoff_cap_ms: default_reconnect_backoff_cap_ms(),
|
me_reconnect_backoff_cap_ms: default_reconnect_backoff_cap_ms(),
|
||||||
me_reconnect_fast_retry_count: default_me_reconnect_fast_retry_count(),
|
me_reconnect_fast_retry_count: default_me_reconnect_fast_retry_count(),
|
||||||
|
me_single_endpoint_shadow_writers: default_me_single_endpoint_shadow_writers(),
|
||||||
|
me_single_endpoint_outage_mode_enabled: default_me_single_endpoint_outage_mode_enabled(),
|
||||||
|
me_single_endpoint_outage_disable_quarantine: default_me_single_endpoint_outage_disable_quarantine(),
|
||||||
|
me_single_endpoint_outage_backoff_min_ms: default_me_single_endpoint_outage_backoff_min_ms(),
|
||||||
|
me_single_endpoint_outage_backoff_max_ms: default_me_single_endpoint_outage_backoff_max_ms(),
|
||||||
|
me_single_endpoint_shadow_rotate_every_secs: default_me_single_endpoint_shadow_rotate_every_secs(),
|
||||||
upstream_connect_retry_attempts: default_upstream_connect_retry_attempts(),
|
upstream_connect_retry_attempts: default_upstream_connect_retry_attempts(),
|
||||||
upstream_connect_retry_backoff_ms: default_upstream_connect_retry_backoff_ms(),
|
upstream_connect_retry_backoff_ms: default_upstream_connect_retry_backoff_ms(),
|
||||||
upstream_unhealthy_fail_threshold: default_upstream_unhealthy_fail_threshold(),
|
upstream_unhealthy_fail_threshold: default_upstream_unhealthy_fail_threshold(),
|
||||||
|
|
|
||||||
|
|
@ -538,6 +538,12 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||||
config.general.me_reconnect_backoff_base_ms,
|
config.general.me_reconnect_backoff_base_ms,
|
||||||
config.general.me_reconnect_backoff_cap_ms,
|
config.general.me_reconnect_backoff_cap_ms,
|
||||||
config.general.me_reconnect_fast_retry_count,
|
config.general.me_reconnect_fast_retry_count,
|
||||||
|
config.general.me_single_endpoint_shadow_writers,
|
||||||
|
config.general.me_single_endpoint_outage_mode_enabled,
|
||||||
|
config.general.me_single_endpoint_outage_disable_quarantine,
|
||||||
|
config.general.me_single_endpoint_outage_backoff_min_ms,
|
||||||
|
config.general.me_single_endpoint_outage_backoff_max_ms,
|
||||||
|
config.general.me_single_endpoint_shadow_rotate_every_secs,
|
||||||
config.general.hardswap,
|
config.general.hardswap,
|
||||||
config.general.me_pool_drain_ttl_secs,
|
config.general.me_pool_drain_ttl_secs,
|
||||||
config.general.effective_me_pool_force_close_secs(),
|
config.general.effective_me_pool_force_close_secs(),
|
||||||
|
|
|
||||||
202
src/metrics.rs
202
src/metrics.rs
|
|
@ -274,6 +274,43 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(out, "# HELP telemt_me_handshake_reject_total ME handshake rejects from upstream");
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_handshake_reject_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_handshake_reject_total {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_handshake_reject_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(out, "# HELP telemt_me_handshake_error_code_total ME handshake reject errors by code");
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_handshake_error_code_total counter");
|
||||||
|
if me_allows_normal {
|
||||||
|
for (error_code, count) in stats.get_me_handshake_error_code_counts() {
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_handshake_error_code_total{{error_code=\"{}\"}} {}",
|
||||||
|
error_code,
|
||||||
|
count
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let _ = writeln!(out, "# HELP telemt_me_reader_eof_total ME reader EOF terminations");
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_reader_eof_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_reader_eof_total {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_reader_eof_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
let _ = writeln!(out, "# HELP telemt_me_crc_mismatch_total ME CRC mismatches");
|
let _ = writeln!(out, "# HELP telemt_me_crc_mismatch_total ME CRC mismatches");
|
||||||
let _ = writeln!(out, "# TYPE telemt_me_crc_mismatch_total counter");
|
let _ = writeln!(out, "# TYPE telemt_me_crc_mismatch_total counter");
|
||||||
let _ = writeln!(
|
let _ = writeln!(
|
||||||
|
|
@ -385,6 +422,171 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_endpoint_quarantine_total ME endpoint quarantines due to rapid flaps"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_endpoint_quarantine_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_endpoint_quarantine_total {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_endpoint_quarantine_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(out, "# HELP telemt_me_kdf_drift_total ME KDF input drift detections");
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_kdf_drift_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_kdf_drift_total {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_kdf_drift_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_hardswap_pending_reuse_total Hardswap cycles that reused an existing pending generation"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_hardswap_pending_reuse_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_hardswap_pending_reuse_total {}",
|
||||||
|
if me_allows_debug {
|
||||||
|
stats.get_me_hardswap_pending_reuse_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_hardswap_pending_ttl_expired_total Pending hardswap generations reset by TTL expiration"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_hardswap_pending_ttl_expired_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_hardswap_pending_ttl_expired_total {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_hardswap_pending_ttl_expired_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_single_endpoint_outage_enter_total Single-endpoint DC outage transitions to active state"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# TYPE telemt_me_single_endpoint_outage_enter_total counter"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_single_endpoint_outage_enter_total {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_single_endpoint_outage_enter_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_single_endpoint_outage_exit_total Single-endpoint DC outage recovery transitions"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# TYPE telemt_me_single_endpoint_outage_exit_total counter"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_single_endpoint_outage_exit_total {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_single_endpoint_outage_exit_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_single_endpoint_outage_reconnect_attempt_total Reconnect attempts performed during single-endpoint outages"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# TYPE telemt_me_single_endpoint_outage_reconnect_attempt_total counter"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_single_endpoint_outage_reconnect_attempt_total {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_single_endpoint_outage_reconnect_attempt_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_single_endpoint_outage_reconnect_success_total Successful reconnect attempts during single-endpoint outages"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# TYPE telemt_me_single_endpoint_outage_reconnect_success_total counter"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_single_endpoint_outage_reconnect_success_total {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_single_endpoint_outage_reconnect_success_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_single_endpoint_quarantine_bypass_total Outage reconnect attempts that bypassed quarantine"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# TYPE telemt_me_single_endpoint_quarantine_bypass_total counter"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_single_endpoint_quarantine_bypass_total {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_single_endpoint_quarantine_bypass_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_single_endpoint_shadow_rotate_total Successful periodic shadow rotations for single-endpoint DC groups"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# TYPE telemt_me_single_endpoint_shadow_rotate_total counter"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_single_endpoint_shadow_rotate_total {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_single_endpoint_shadow_rotate_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
let _ = writeln!(out, "# HELP telemt_secure_padding_invalid_total Invalid secure frame lengths");
|
let _ = writeln!(out, "# HELP telemt_secure_padding_invalid_total Invalid secure frame lengths");
|
||||||
let _ = writeln!(out, "# TYPE telemt_secure_padding_invalid_total counter");
|
let _ = writeln!(out, "# TYPE telemt_secure_padding_invalid_total counter");
|
||||||
let _ = writeln!(
|
let _ = writeln!(
|
||||||
|
|
|
||||||
145
src/stats/mod.rs
145
src/stats/mod.rs
|
|
@ -32,8 +32,21 @@ pub struct Stats {
|
||||||
me_keepalive_timeout: AtomicU64,
|
me_keepalive_timeout: AtomicU64,
|
||||||
me_reconnect_attempts: AtomicU64,
|
me_reconnect_attempts: AtomicU64,
|
||||||
me_reconnect_success: AtomicU64,
|
me_reconnect_success: AtomicU64,
|
||||||
|
me_handshake_reject_total: AtomicU64,
|
||||||
|
me_reader_eof_total: AtomicU64,
|
||||||
me_crc_mismatch: AtomicU64,
|
me_crc_mismatch: AtomicU64,
|
||||||
me_seq_mismatch: AtomicU64,
|
me_seq_mismatch: AtomicU64,
|
||||||
|
me_endpoint_quarantine_total: AtomicU64,
|
||||||
|
me_kdf_drift_total: AtomicU64,
|
||||||
|
me_hardswap_pending_reuse_total: AtomicU64,
|
||||||
|
me_hardswap_pending_ttl_expired_total: AtomicU64,
|
||||||
|
me_single_endpoint_outage_enter_total: AtomicU64,
|
||||||
|
me_single_endpoint_outage_exit_total: AtomicU64,
|
||||||
|
me_single_endpoint_outage_reconnect_attempt_total: AtomicU64,
|
||||||
|
me_single_endpoint_outage_reconnect_success_total: AtomicU64,
|
||||||
|
me_single_endpoint_quarantine_bypass_total: AtomicU64,
|
||||||
|
me_single_endpoint_shadow_rotate_total: AtomicU64,
|
||||||
|
me_handshake_error_codes: DashMap<i32, AtomicU64>,
|
||||||
me_route_drop_no_conn: AtomicU64,
|
me_route_drop_no_conn: AtomicU64,
|
||||||
me_route_drop_channel_closed: AtomicU64,
|
me_route_drop_channel_closed: AtomicU64,
|
||||||
me_route_drop_queue_full: AtomicU64,
|
me_route_drop_queue_full: AtomicU64,
|
||||||
|
|
@ -172,6 +185,26 @@ impl Stats {
|
||||||
self.me_reconnect_success.fetch_add(1, Ordering::Relaxed);
|
self.me_reconnect_success.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
pub fn increment_me_handshake_reject_total(&self) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_handshake_reject_total.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn increment_me_handshake_error_code(&self, code: i32) {
|
||||||
|
if !self.telemetry_me_allows_normal() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let entry = self
|
||||||
|
.me_handshake_error_codes
|
||||||
|
.entry(code)
|
||||||
|
.or_insert_with(|| AtomicU64::new(0));
|
||||||
|
entry.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
pub fn increment_me_reader_eof_total(&self) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_reader_eof_total.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
pub fn increment_me_crc_mismatch(&self) {
|
pub fn increment_me_crc_mismatch(&self) {
|
||||||
if self.telemetry_me_allows_normal() {
|
if self.telemetry_me_allows_normal() {
|
||||||
self.me_crc_mismatch.fetch_add(1, Ordering::Relaxed);
|
self.me_crc_mismatch.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
@ -333,6 +366,65 @@ impl Stats {
|
||||||
.fetch_add(1, Ordering::Relaxed);
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
pub fn increment_me_endpoint_quarantine_total(&self) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_endpoint_quarantine_total
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn increment_me_kdf_drift_total(&self) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_kdf_drift_total.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn increment_me_hardswap_pending_reuse_total(&self) {
|
||||||
|
if self.telemetry_me_allows_debug() {
|
||||||
|
self.me_hardswap_pending_reuse_total
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn increment_me_hardswap_pending_ttl_expired_total(&self) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_hardswap_pending_ttl_expired_total
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn increment_me_single_endpoint_outage_enter_total(&self) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_single_endpoint_outage_enter_total
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn increment_me_single_endpoint_outage_exit_total(&self) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_single_endpoint_outage_exit_total
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn increment_me_single_endpoint_outage_reconnect_attempt_total(&self) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_single_endpoint_outage_reconnect_attempt_total
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn increment_me_single_endpoint_outage_reconnect_success_total(&self) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_single_endpoint_outage_reconnect_success_total
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn increment_me_single_endpoint_quarantine_bypass_total(&self) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_single_endpoint_quarantine_bypass_total
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn increment_me_single_endpoint_shadow_rotate_total(&self) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_single_endpoint_shadow_rotate_total
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) }
|
pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) }
|
||||||
pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) }
|
pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) }
|
||||||
pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) }
|
pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) }
|
||||||
|
|
@ -341,8 +433,61 @@ impl Stats {
|
||||||
pub fn get_me_keepalive_timeout(&self) -> u64 { self.me_keepalive_timeout.load(Ordering::Relaxed) }
|
pub fn get_me_keepalive_timeout(&self) -> u64 { self.me_keepalive_timeout.load(Ordering::Relaxed) }
|
||||||
pub fn get_me_reconnect_attempts(&self) -> u64 { self.me_reconnect_attempts.load(Ordering::Relaxed) }
|
pub fn get_me_reconnect_attempts(&self) -> u64 { self.me_reconnect_attempts.load(Ordering::Relaxed) }
|
||||||
pub fn get_me_reconnect_success(&self) -> u64 { self.me_reconnect_success.load(Ordering::Relaxed) }
|
pub fn get_me_reconnect_success(&self) -> u64 { self.me_reconnect_success.load(Ordering::Relaxed) }
|
||||||
|
pub fn get_me_handshake_reject_total(&self) -> u64 {
|
||||||
|
self.me_handshake_reject_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_reader_eof_total(&self) -> u64 {
|
||||||
|
self.me_reader_eof_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
pub fn get_me_crc_mismatch(&self) -> u64 { self.me_crc_mismatch.load(Ordering::Relaxed) }
|
pub fn get_me_crc_mismatch(&self) -> u64 { self.me_crc_mismatch.load(Ordering::Relaxed) }
|
||||||
pub fn get_me_seq_mismatch(&self) -> u64 { self.me_seq_mismatch.load(Ordering::Relaxed) }
|
pub fn get_me_seq_mismatch(&self) -> u64 { self.me_seq_mismatch.load(Ordering::Relaxed) }
|
||||||
|
pub fn get_me_endpoint_quarantine_total(&self) -> u64 {
|
||||||
|
self.me_endpoint_quarantine_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_kdf_drift_total(&self) -> u64 {
|
||||||
|
self.me_kdf_drift_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_hardswap_pending_reuse_total(&self) -> u64 {
|
||||||
|
self.me_hardswap_pending_reuse_total
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_hardswap_pending_ttl_expired_total(&self) -> u64 {
|
||||||
|
self.me_hardswap_pending_ttl_expired_total
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_single_endpoint_outage_enter_total(&self) -> u64 {
|
||||||
|
self.me_single_endpoint_outage_enter_total
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_single_endpoint_outage_exit_total(&self) -> u64 {
|
||||||
|
self.me_single_endpoint_outage_exit_total
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_single_endpoint_outage_reconnect_attempt_total(&self) -> u64 {
|
||||||
|
self.me_single_endpoint_outage_reconnect_attempt_total
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_single_endpoint_outage_reconnect_success_total(&self) -> u64 {
|
||||||
|
self.me_single_endpoint_outage_reconnect_success_total
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_single_endpoint_quarantine_bypass_total(&self) -> u64 {
|
||||||
|
self.me_single_endpoint_quarantine_bypass_total
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_single_endpoint_shadow_rotate_total(&self) -> u64 {
|
||||||
|
self.me_single_endpoint_shadow_rotate_total
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_handshake_error_code_counts(&self) -> Vec<(i32, u64)> {
|
||||||
|
let mut out: Vec<(i32, u64)> = self
|
||||||
|
.me_handshake_error_codes
|
||||||
|
.iter()
|
||||||
|
.map(|entry| (*entry.key(), entry.value().load(Ordering::Relaxed)))
|
||||||
|
.collect();
|
||||||
|
out.sort_by_key(|(code, _)| *code);
|
||||||
|
out
|
||||||
|
}
|
||||||
pub fn get_me_route_drop_no_conn(&self) -> u64 { self.me_route_drop_no_conn.load(Ordering::Relaxed) }
|
pub fn get_me_route_drop_no_conn(&self) -> u64 { self.me_route_drop_no_conn.load(Ordering::Relaxed) }
|
||||||
pub fn get_me_route_drop_channel_closed(&self) -> u64 {
|
pub fn get_me_route_drop_channel_closed(&self) -> u64 {
|
||||||
self.me_route_drop_channel_closed.load(Ordering::Relaxed)
|
self.me_route_drop_channel_closed.load(Ordering::Relaxed)
|
||||||
|
|
|
||||||
|
|
@ -276,6 +276,12 @@ async fn run_update_cycle(
|
||||||
cfg.general.me_bind_stale_ttl_secs,
|
cfg.general.me_bind_stale_ttl_secs,
|
||||||
cfg.general.me_secret_atomic_snapshot,
|
cfg.general.me_secret_atomic_snapshot,
|
||||||
cfg.general.me_deterministic_writer_sort,
|
cfg.general.me_deterministic_writer_sort,
|
||||||
|
cfg.general.me_single_endpoint_shadow_writers,
|
||||||
|
cfg.general.me_single_endpoint_outage_mode_enabled,
|
||||||
|
cfg.general.me_single_endpoint_outage_disable_quarantine,
|
||||||
|
cfg.general.me_single_endpoint_outage_backoff_min_ms,
|
||||||
|
cfg.general.me_single_endpoint_outage_backoff_max_ms,
|
||||||
|
cfg.general.me_single_endpoint_shadow_rotate_every_secs,
|
||||||
);
|
);
|
||||||
|
|
||||||
let required_cfg_snapshots = cfg.general.me_config_stable_snapshots.max(1);
|
let required_cfg_snapshots = cfg.general.me_config_stable_snapshots.max(1);
|
||||||
|
|
@ -478,6 +484,12 @@ pub async fn me_config_updater(
|
||||||
cfg.general.me_bind_stale_ttl_secs,
|
cfg.general.me_bind_stale_ttl_secs,
|
||||||
cfg.general.me_secret_atomic_snapshot,
|
cfg.general.me_secret_atomic_snapshot,
|
||||||
cfg.general.me_deterministic_writer_sort,
|
cfg.general.me_deterministic_writer_sort,
|
||||||
|
cfg.general.me_single_endpoint_shadow_writers,
|
||||||
|
cfg.general.me_single_endpoint_outage_mode_enabled,
|
||||||
|
cfg.general.me_single_endpoint_outage_disable_quarantine,
|
||||||
|
cfg.general.me_single_endpoint_outage_backoff_min_ms,
|
||||||
|
cfg.general.me_single_endpoint_outage_backoff_max_ms,
|
||||||
|
cfg.general.me_single_endpoint_shadow_rotate_every_secs,
|
||||||
);
|
);
|
||||||
let new_secs = cfg.general.effective_update_every_secs().max(1);
|
let new_secs = cfg.general.effective_update_every_secs().max(1);
|
||||||
if new_secs == update_every_secs {
|
if new_secs == update_every_secs {
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,8 @@
|
||||||
use std::net::{IpAddr, SocketAddr};
|
use std::net::{IpAddr, SocketAddr};
|
||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::Ordering;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
use std::collections::hash_map::DefaultHasher;
|
||||||
|
use std::hash::{Hash, Hasher};
|
||||||
use socket2::{SockRef, TcpKeepalive};
|
use socket2::{SockRef, TcpKeepalive};
|
||||||
#[cfg(target_os = "linux")]
|
#[cfg(target_os = "linux")]
|
||||||
use libc;
|
use libc;
|
||||||
|
|
@ -34,6 +36,8 @@ use super::codec::{
|
||||||
use super::wire::{extract_ip_material, IpMaterial};
|
use super::wire::{extract_ip_material, IpMaterial};
|
||||||
use super::MePool;
|
use super::MePool;
|
||||||
|
|
||||||
|
const ME_KDF_DRIFT_STRICT: bool = false;
|
||||||
|
|
||||||
/// Result of a successful ME handshake with timings.
|
/// Result of a successful ME handshake with timings.
|
||||||
pub(crate) struct HandshakeOutput {
|
pub(crate) struct HandshakeOutput {
|
||||||
pub rd: ReadHalf<TcpStream>,
|
pub rd: ReadHalf<TcpStream>,
|
||||||
|
|
@ -47,6 +51,22 @@ pub(crate) struct HandshakeOutput {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MePool {
|
impl MePool {
|
||||||
|
fn kdf_material_fingerprint(
|
||||||
|
local_addr_nat: SocketAddr,
|
||||||
|
peer_addr_nat: SocketAddr,
|
||||||
|
client_port_for_kdf: u16,
|
||||||
|
reflected: Option<SocketAddr>,
|
||||||
|
socks_bound_addr: Option<SocketAddr>,
|
||||||
|
) -> u64 {
|
||||||
|
let mut hasher = DefaultHasher::new();
|
||||||
|
local_addr_nat.hash(&mut hasher);
|
||||||
|
peer_addr_nat.hash(&mut hasher);
|
||||||
|
client_port_for_kdf.hash(&mut hasher);
|
||||||
|
reflected.hash(&mut hasher);
|
||||||
|
socks_bound_addr.hash(&mut hasher);
|
||||||
|
hasher.finish()
|
||||||
|
}
|
||||||
|
|
||||||
async fn resolve_dc_idx_for_endpoint(&self, addr: SocketAddr) -> Option<i16> {
|
async fn resolve_dc_idx_for_endpoint(&self, addr: SocketAddr) -> Option<i16> {
|
||||||
if addr.is_ipv4() {
|
if addr.is_ipv4() {
|
||||||
let map = self.proxy_map_v4.read().await;
|
let map = self.proxy_map_v4.read().await;
|
||||||
|
|
@ -343,6 +363,33 @@ impl MePool {
|
||||||
.map(|bound| bound.port())
|
.map(|bound| bound.port())
|
||||||
.filter(|port| *port != 0)
|
.filter(|port| *port != 0)
|
||||||
.unwrap_or(local_addr_nat.port());
|
.unwrap_or(local_addr_nat.port());
|
||||||
|
let kdf_fingerprint = Self::kdf_material_fingerprint(
|
||||||
|
local_addr_nat,
|
||||||
|
peer_addr_nat,
|
||||||
|
client_port_for_kdf,
|
||||||
|
reflected,
|
||||||
|
socks_bound_addr,
|
||||||
|
);
|
||||||
|
let mut kdf_fingerprint_guard = self.kdf_material_fingerprint.lock().await;
|
||||||
|
if let Some(prev_fingerprint) = kdf_fingerprint_guard.get(&peer_addr_nat).copied()
|
||||||
|
&& prev_fingerprint != kdf_fingerprint
|
||||||
|
{
|
||||||
|
self.stats.increment_me_kdf_drift_total();
|
||||||
|
warn!(
|
||||||
|
%peer_addr_nat,
|
||||||
|
%local_addr_nat,
|
||||||
|
client_port_for_kdf,
|
||||||
|
"ME KDF input drift detected for endpoint"
|
||||||
|
);
|
||||||
|
if ME_KDF_DRIFT_STRICT {
|
||||||
|
return Err(ProxyError::InvalidHandshake(
|
||||||
|
"ME KDF input drift detected (strict mode)".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
kdf_fingerprint_guard.insert(peer_addr_nat, kdf_fingerprint);
|
||||||
|
drop(kdf_fingerprint_guard);
|
||||||
|
|
||||||
let client_port_bytes = client_port_for_kdf.to_le_bytes();
|
let client_port_bytes = client_port_for_kdf.to_le_bytes();
|
||||||
|
|
||||||
let server_ip = extract_ip_material(peer_addr_nat);
|
let server_ip = extract_ip_material(peer_addr_nat);
|
||||||
|
|
@ -540,6 +587,8 @@ impl MePool {
|
||||||
} else {
|
} else {
|
||||||
-1
|
-1
|
||||||
};
|
};
|
||||||
|
self.stats.increment_me_handshake_reject_total();
|
||||||
|
self.stats.increment_me_handshake_error_code(err_code);
|
||||||
return Err(ProxyError::InvalidHandshake(format!(
|
return Err(ProxyError::InvalidHandshake(format!(
|
||||||
"ME rejected handshake (error={err_code})"
|
"ME rejected handshake (error={err_code})"
|
||||||
)));
|
)));
|
||||||
|
|
|
||||||
|
|
@ -1,10 +1,11 @@
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
use std::collections::HashSet;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use tracing::{debug, info, warn};
|
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
|
use tracing::{debug, info, warn};
|
||||||
|
|
||||||
use crate::crypto::SecureRandom;
|
use crate::crypto::SecureRandom;
|
||||||
use crate::network::IpFamily;
|
use crate::network::IpFamily;
|
||||||
|
|
@ -15,11 +16,16 @@ const HEALTH_INTERVAL_SECS: u64 = 1;
|
||||||
const JITTER_FRAC_NUM: u64 = 2; // jitter up to 50% of backoff
|
const JITTER_FRAC_NUM: u64 = 2; // jitter up to 50% of backoff
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
const MAX_CONCURRENT_PER_DC_DEFAULT: usize = 1;
|
const MAX_CONCURRENT_PER_DC_DEFAULT: usize = 1;
|
||||||
|
const SHADOW_ROTATE_RETRY_SECS: u64 = 30;
|
||||||
|
|
||||||
pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_connections: usize) {
|
pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_connections: usize) {
|
||||||
let mut backoff: HashMap<(i32, IpFamily), u64> = HashMap::new();
|
let mut backoff: HashMap<(i32, IpFamily), u64> = HashMap::new();
|
||||||
let mut next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new();
|
let mut next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new();
|
||||||
let mut inflight: HashMap<(i32, IpFamily), usize> = HashMap::new();
|
let mut inflight: HashMap<(i32, IpFamily), usize> = HashMap::new();
|
||||||
|
let mut outage_backoff: HashMap<(i32, IpFamily), u64> = HashMap::new();
|
||||||
|
let mut outage_next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new();
|
||||||
|
let mut single_endpoint_outage: HashSet<(i32, IpFamily)> = HashSet::new();
|
||||||
|
let mut shadow_rotate_deadline: HashMap<(i32, IpFamily), Instant> = HashMap::new();
|
||||||
loop {
|
loop {
|
||||||
tokio::time::sleep(Duration::from_secs(HEALTH_INTERVAL_SECS)).await;
|
tokio::time::sleep(Duration::from_secs(HEALTH_INTERVAL_SECS)).await;
|
||||||
pool.prune_closed_writers().await;
|
pool.prune_closed_writers().await;
|
||||||
|
|
@ -30,6 +36,10 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
|
||||||
&mut backoff,
|
&mut backoff,
|
||||||
&mut next_attempt,
|
&mut next_attempt,
|
||||||
&mut inflight,
|
&mut inflight,
|
||||||
|
&mut outage_backoff,
|
||||||
|
&mut outage_next_attempt,
|
||||||
|
&mut single_endpoint_outage,
|
||||||
|
&mut shadow_rotate_deadline,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
check_family(
|
check_family(
|
||||||
|
|
@ -39,6 +49,10 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
|
||||||
&mut backoff,
|
&mut backoff,
|
||||||
&mut next_attempt,
|
&mut next_attempt,
|
||||||
&mut inflight,
|
&mut inflight,
|
||||||
|
&mut outage_backoff,
|
||||||
|
&mut outage_next_attempt,
|
||||||
|
&mut single_endpoint_outage,
|
||||||
|
&mut shadow_rotate_deadline,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
@ -51,6 +65,10 @@ async fn check_family(
|
||||||
backoff: &mut HashMap<(i32, IpFamily), u64>,
|
backoff: &mut HashMap<(i32, IpFamily), u64>,
|
||||||
next_attempt: &mut HashMap<(i32, IpFamily), Instant>,
|
next_attempt: &mut HashMap<(i32, IpFamily), Instant>,
|
||||||
inflight: &mut HashMap<(i32, IpFamily), usize>,
|
inflight: &mut HashMap<(i32, IpFamily), usize>,
|
||||||
|
outage_backoff: &mut HashMap<(i32, IpFamily), u64>,
|
||||||
|
outage_next_attempt: &mut HashMap<(i32, IpFamily), Instant>,
|
||||||
|
single_endpoint_outage: &mut HashSet<(i32, IpFamily)>,
|
||||||
|
shadow_rotate_deadline: &mut HashMap<(i32, IpFamily), Instant>,
|
||||||
) {
|
) {
|
||||||
let enabled = match family {
|
let enabled = match family {
|
||||||
IpFamily::V4 => pool.decision.ipv4_me,
|
IpFamily::V4 => pool.decision.ipv4_me,
|
||||||
|
|
@ -78,31 +96,86 @@ async fn check_family(
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut live_addr_counts = HashMap::<SocketAddr, usize>::new();
|
let mut live_addr_counts = HashMap::<SocketAddr, usize>::new();
|
||||||
for writer in pool
|
let mut live_writer_ids_by_addr = HashMap::<SocketAddr, Vec<u64>>::new();
|
||||||
.writers
|
for writer in pool.writers.read().await.iter().filter(|w| {
|
||||||
.read()
|
!w.draining.load(std::sync::atomic::Ordering::Relaxed)
|
||||||
.await
|
}) {
|
||||||
.iter()
|
|
||||||
.filter(|w| !w.draining.load(std::sync::atomic::Ordering::Relaxed))
|
|
||||||
{
|
|
||||||
*live_addr_counts.entry(writer.addr).or_insert(0) += 1;
|
*live_addr_counts.entry(writer.addr).or_insert(0) += 1;
|
||||||
|
live_writer_ids_by_addr
|
||||||
|
.entry(writer.addr)
|
||||||
|
.or_default()
|
||||||
|
.push(writer.id);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (dc, endpoints) in dc_endpoints {
|
for (dc, endpoints) in dc_endpoints {
|
||||||
if endpoints.is_empty() {
|
if endpoints.is_empty() {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let required = MePool::required_writers_for_dc(endpoints.len());
|
let required = pool.required_writers_for_dc(endpoints.len());
|
||||||
let alive = endpoints
|
let alive = endpoints
|
||||||
.iter()
|
.iter()
|
||||||
.map(|addr| *live_addr_counts.get(addr).unwrap_or(&0))
|
.map(|addr| *live_addr_counts.get(addr).unwrap_or(&0))
|
||||||
.sum::<usize>();
|
.sum::<usize>();
|
||||||
|
let key = (dc, family);
|
||||||
|
|
||||||
|
if endpoints.len() == 1 && pool.single_endpoint_outage_mode_enabled() && alive == 0 {
|
||||||
|
if single_endpoint_outage.insert(key) {
|
||||||
|
pool.stats.increment_me_single_endpoint_outage_enter_total();
|
||||||
|
warn!(
|
||||||
|
dc = %dc,
|
||||||
|
?family,
|
||||||
|
required,
|
||||||
|
endpoint_count = endpoints.len(),
|
||||||
|
"Single-endpoint DC outage detected"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
recover_single_endpoint_outage(
|
||||||
|
pool,
|
||||||
|
rng,
|
||||||
|
key,
|
||||||
|
endpoints[0],
|
||||||
|
required,
|
||||||
|
outage_backoff,
|
||||||
|
outage_next_attempt,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if single_endpoint_outage.remove(&key) {
|
||||||
|
pool.stats.increment_me_single_endpoint_outage_exit_total();
|
||||||
|
outage_backoff.remove(&key);
|
||||||
|
outage_next_attempt.remove(&key);
|
||||||
|
shadow_rotate_deadline.remove(&key);
|
||||||
|
info!(
|
||||||
|
dc = %dc,
|
||||||
|
?family,
|
||||||
|
alive,
|
||||||
|
required,
|
||||||
|
endpoint_count = endpoints.len(),
|
||||||
|
"Single-endpoint DC outage recovered"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
if alive >= required {
|
if alive >= required {
|
||||||
|
maybe_rotate_single_endpoint_shadow(
|
||||||
|
pool,
|
||||||
|
rng,
|
||||||
|
key,
|
||||||
|
dc,
|
||||||
|
family,
|
||||||
|
&endpoints,
|
||||||
|
alive,
|
||||||
|
required,
|
||||||
|
&live_writer_ids_by_addr,
|
||||||
|
shadow_rotate_deadline,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let missing = required - alive;
|
let missing = required - alive;
|
||||||
|
|
||||||
let key = (dc, family);
|
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
if let Some(ts) = next_attempt.get(&key)
|
if let Some(ts) = next_attempt.get(&key)
|
||||||
&& now < *ts
|
&& now < *ts
|
||||||
|
|
@ -188,3 +261,207 @@ async fn check_family(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn recover_single_endpoint_outage(
|
||||||
|
pool: &Arc<MePool>,
|
||||||
|
rng: &Arc<SecureRandom>,
|
||||||
|
key: (i32, IpFamily),
|
||||||
|
endpoint: SocketAddr,
|
||||||
|
required: usize,
|
||||||
|
outage_backoff: &mut HashMap<(i32, IpFamily), u64>,
|
||||||
|
outage_next_attempt: &mut HashMap<(i32, IpFamily), Instant>,
|
||||||
|
) {
|
||||||
|
let now = Instant::now();
|
||||||
|
if let Some(ts) = outage_next_attempt.get(&key)
|
||||||
|
&& now < *ts
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let (min_backoff_ms, max_backoff_ms) = pool.single_endpoint_outage_backoff_bounds_ms();
|
||||||
|
pool.stats
|
||||||
|
.increment_me_single_endpoint_outage_reconnect_attempt_total();
|
||||||
|
|
||||||
|
let bypass_quarantine = pool.single_endpoint_outage_disable_quarantine();
|
||||||
|
let attempt_ok = if bypass_quarantine {
|
||||||
|
pool.stats
|
||||||
|
.increment_me_single_endpoint_quarantine_bypass_total();
|
||||||
|
match tokio::time::timeout(pool.me_one_timeout, pool.connect_one(endpoint, rng.as_ref())).await {
|
||||||
|
Ok(Ok(())) => true,
|
||||||
|
Ok(Err(e)) => {
|
||||||
|
debug!(
|
||||||
|
dc = %key.0,
|
||||||
|
family = ?key.1,
|
||||||
|
%endpoint,
|
||||||
|
error = %e,
|
||||||
|
"Single-endpoint outage reconnect failed (quarantine bypass path)"
|
||||||
|
);
|
||||||
|
false
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
debug!(
|
||||||
|
dc = %key.0,
|
||||||
|
family = ?key.1,
|
||||||
|
%endpoint,
|
||||||
|
"Single-endpoint outage reconnect timed out (quarantine bypass path)"
|
||||||
|
);
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
let one_endpoint = [endpoint];
|
||||||
|
match tokio::time::timeout(
|
||||||
|
pool.me_one_timeout,
|
||||||
|
pool.connect_endpoints_round_robin(&one_endpoint, rng.as_ref()),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(ok) => ok,
|
||||||
|
Err(_) => {
|
||||||
|
debug!(
|
||||||
|
dc = %key.0,
|
||||||
|
family = ?key.1,
|
||||||
|
%endpoint,
|
||||||
|
"Single-endpoint outage reconnect timed out"
|
||||||
|
);
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if attempt_ok {
|
||||||
|
pool.stats
|
||||||
|
.increment_me_single_endpoint_outage_reconnect_success_total();
|
||||||
|
pool.stats.increment_me_reconnect_success();
|
||||||
|
outage_backoff.insert(key, min_backoff_ms);
|
||||||
|
let jitter = min_backoff_ms / JITTER_FRAC_NUM;
|
||||||
|
let wait = Duration::from_millis(min_backoff_ms)
|
||||||
|
+ Duration::from_millis(rand::rng().random_range(0..=jitter.max(1)));
|
||||||
|
outage_next_attempt.insert(key, now + wait);
|
||||||
|
info!(
|
||||||
|
dc = %key.0,
|
||||||
|
family = ?key.1,
|
||||||
|
%endpoint,
|
||||||
|
required,
|
||||||
|
backoff_ms = min_backoff_ms,
|
||||||
|
"Single-endpoint outage reconnect succeeded"
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
pool.stats.increment_me_reconnect_attempt();
|
||||||
|
let current_ms = *outage_backoff.get(&key).unwrap_or(&min_backoff_ms);
|
||||||
|
let next_ms = current_ms.saturating_mul(2).min(max_backoff_ms);
|
||||||
|
outage_backoff.insert(key, next_ms);
|
||||||
|
let jitter = next_ms / JITTER_FRAC_NUM;
|
||||||
|
let wait = Duration::from_millis(next_ms)
|
||||||
|
+ Duration::from_millis(rand::rng().random_range(0..=jitter.max(1)));
|
||||||
|
outage_next_attempt.insert(key, now + wait);
|
||||||
|
warn!(
|
||||||
|
dc = %key.0,
|
||||||
|
family = ?key.1,
|
||||||
|
%endpoint,
|
||||||
|
required,
|
||||||
|
backoff_ms = next_ms,
|
||||||
|
"Single-endpoint outage reconnect scheduled"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn maybe_rotate_single_endpoint_shadow(
|
||||||
|
pool: &Arc<MePool>,
|
||||||
|
rng: &Arc<SecureRandom>,
|
||||||
|
key: (i32, IpFamily),
|
||||||
|
dc: i32,
|
||||||
|
family: IpFamily,
|
||||||
|
endpoints: &[SocketAddr],
|
||||||
|
alive: usize,
|
||||||
|
required: usize,
|
||||||
|
live_writer_ids_by_addr: &HashMap<SocketAddr, Vec<u64>>,
|
||||||
|
shadow_rotate_deadline: &mut HashMap<(i32, IpFamily), Instant>,
|
||||||
|
) {
|
||||||
|
if endpoints.len() != 1 || alive < required {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let Some(interval) = pool.single_endpoint_shadow_rotate_interval() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
let now = Instant::now();
|
||||||
|
if let Some(deadline) = shadow_rotate_deadline.get(&key)
|
||||||
|
&& now < *deadline
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let endpoint = endpoints[0];
|
||||||
|
let Some(writer_ids) = live_writer_ids_by_addr.get(&endpoint) else {
|
||||||
|
shadow_rotate_deadline.insert(key, now + Duration::from_secs(SHADOW_ROTATE_RETRY_SECS));
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut candidate_writer_id = None;
|
||||||
|
for writer_id in writer_ids {
|
||||||
|
if pool.registry.is_writer_empty(*writer_id).await {
|
||||||
|
candidate_writer_id = Some(*writer_id);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let Some(old_writer_id) = candidate_writer_id else {
|
||||||
|
shadow_rotate_deadline.insert(key, now + Duration::from_secs(SHADOW_ROTATE_RETRY_SECS));
|
||||||
|
debug!(
|
||||||
|
dc = %dc,
|
||||||
|
?family,
|
||||||
|
%endpoint,
|
||||||
|
alive,
|
||||||
|
required,
|
||||||
|
"Single-endpoint shadow rotation skipped: no empty writer candidate"
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
let rotate_ok = match tokio::time::timeout(pool.me_one_timeout, pool.connect_one(endpoint, rng.as_ref())).await {
|
||||||
|
Ok(Ok(())) => true,
|
||||||
|
Ok(Err(e)) => {
|
||||||
|
debug!(
|
||||||
|
dc = %dc,
|
||||||
|
?family,
|
||||||
|
%endpoint,
|
||||||
|
error = %e,
|
||||||
|
"Single-endpoint shadow rotation connect failed"
|
||||||
|
);
|
||||||
|
false
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
debug!(
|
||||||
|
dc = %dc,
|
||||||
|
?family,
|
||||||
|
%endpoint,
|
||||||
|
"Single-endpoint shadow rotation connect timed out"
|
||||||
|
);
|
||||||
|
false
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if !rotate_ok {
|
||||||
|
shadow_rotate_deadline.insert(
|
||||||
|
key,
|
||||||
|
now + interval.min(Duration::from_secs(SHADOW_ROTATE_RETRY_SECS)),
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
pool.mark_writer_draining_with_timeout(old_writer_id, pool.force_close_timeout(), false)
|
||||||
|
.await;
|
||||||
|
pool.stats.increment_me_single_endpoint_shadow_rotate_total();
|
||||||
|
shadow_rotate_deadline.insert(key, now + interval);
|
||||||
|
info!(
|
||||||
|
dc = %dc,
|
||||||
|
?family,
|
||||||
|
%endpoint,
|
||||||
|
old_writer_id,
|
||||||
|
rotate_every_secs = interval.as_secs(),
|
||||||
|
"Single-endpoint shadow writer rotated"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -16,11 +16,18 @@ use crate::transport::UpstreamManager;
|
||||||
use super::ConnRegistry;
|
use super::ConnRegistry;
|
||||||
use super::codec::WriterCommand;
|
use super::codec::WriterCommand;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||||
|
pub(super) struct RefillDcKey {
|
||||||
|
pub dc: i32,
|
||||||
|
pub family: IpFamily,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct MeWriter {
|
pub struct MeWriter {
|
||||||
pub id: u64,
|
pub id: u64,
|
||||||
pub addr: SocketAddr,
|
pub addr: SocketAddr,
|
||||||
pub generation: u64,
|
pub generation: u64,
|
||||||
|
pub contour: Arc<AtomicU8>,
|
||||||
pub created_at: Instant,
|
pub created_at: Instant,
|
||||||
pub tx: mpsc::Sender<WriterCommand>,
|
pub tx: mpsc::Sender<WriterCommand>,
|
||||||
pub cancel: CancellationToken,
|
pub cancel: CancellationToken,
|
||||||
|
|
@ -30,6 +37,29 @@ pub struct MeWriter {
|
||||||
pub allow_drain_fallback: Arc<AtomicBool>,
|
pub allow_drain_fallback: Arc<AtomicBool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
|
#[repr(u8)]
|
||||||
|
pub(super) enum WriterContour {
|
||||||
|
Warm = 0,
|
||||||
|
Active = 1,
|
||||||
|
Draining = 2,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WriterContour {
|
||||||
|
pub(super) fn as_u8(self) -> u8 {
|
||||||
|
self as u8
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn from_u8(value: u8) -> Self {
|
||||||
|
match value {
|
||||||
|
0 => Self::Warm,
|
||||||
|
1 => Self::Active,
|
||||||
|
2 => Self::Draining,
|
||||||
|
_ => Self::Draining,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct SecretSnapshot {
|
pub struct SecretSnapshot {
|
||||||
pub epoch: u64,
|
pub epoch: u64,
|
||||||
|
|
@ -71,6 +101,12 @@ pub struct MePool {
|
||||||
pub(super) me_reconnect_backoff_base: Duration,
|
pub(super) me_reconnect_backoff_base: Duration,
|
||||||
pub(super) me_reconnect_backoff_cap: Duration,
|
pub(super) me_reconnect_backoff_cap: Duration,
|
||||||
pub(super) me_reconnect_fast_retry_count: u32,
|
pub(super) me_reconnect_fast_retry_count: u32,
|
||||||
|
pub(super) me_single_endpoint_shadow_writers: AtomicU8,
|
||||||
|
pub(super) me_single_endpoint_outage_mode_enabled: AtomicBool,
|
||||||
|
pub(super) me_single_endpoint_outage_disable_quarantine: AtomicBool,
|
||||||
|
pub(super) me_single_endpoint_outage_backoff_min_ms: AtomicU64,
|
||||||
|
pub(super) me_single_endpoint_outage_backoff_max_ms: AtomicU64,
|
||||||
|
pub(super) me_single_endpoint_shadow_rotate_every_secs: AtomicU64,
|
||||||
pub(super) proxy_map_v4: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
|
pub(super) proxy_map_v4: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
|
||||||
pub(super) proxy_map_v6: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
|
pub(super) proxy_map_v6: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
|
||||||
pub(super) default_dc: AtomicI32,
|
pub(super) default_dc: AtomicI32,
|
||||||
|
|
@ -80,12 +116,18 @@ pub struct MePool {
|
||||||
pub(super) nat_reflection_cache: Arc<Mutex<NatReflectionCache>>,
|
pub(super) nat_reflection_cache: Arc<Mutex<NatReflectionCache>>,
|
||||||
pub(super) writer_available: Arc<Notify>,
|
pub(super) writer_available: Arc<Notify>,
|
||||||
pub(super) refill_inflight: Arc<Mutex<HashSet<SocketAddr>>>,
|
pub(super) refill_inflight: Arc<Mutex<HashSet<SocketAddr>>>,
|
||||||
|
pub(super) refill_inflight_dc: Arc<Mutex<HashSet<RefillDcKey>>>,
|
||||||
pub(super) conn_count: AtomicUsize,
|
pub(super) conn_count: AtomicUsize,
|
||||||
pub(super) stats: Arc<crate::stats::Stats>,
|
pub(super) stats: Arc<crate::stats::Stats>,
|
||||||
pub(super) generation: AtomicU64,
|
pub(super) generation: AtomicU64,
|
||||||
|
pub(super) active_generation: AtomicU64,
|
||||||
|
pub(super) warm_generation: AtomicU64,
|
||||||
pub(super) pending_hardswap_generation: AtomicU64,
|
pub(super) pending_hardswap_generation: AtomicU64,
|
||||||
|
pub(super) pending_hardswap_started_at_epoch_secs: AtomicU64,
|
||||||
|
pub(super) pending_hardswap_map_hash: AtomicU64,
|
||||||
pub(super) hardswap: AtomicBool,
|
pub(super) hardswap: AtomicBool,
|
||||||
pub(super) endpoint_quarantine: Arc<Mutex<HashMap<SocketAddr, Instant>>>,
|
pub(super) endpoint_quarantine: Arc<Mutex<HashMap<SocketAddr, Instant>>>,
|
||||||
|
pub(super) kdf_material_fingerprint: Arc<Mutex<HashMap<SocketAddr, u64>>>,
|
||||||
pub(super) me_pool_drain_ttl_secs: AtomicU64,
|
pub(super) me_pool_drain_ttl_secs: AtomicU64,
|
||||||
pub(super) me_pool_force_close_secs: AtomicU64,
|
pub(super) me_pool_force_close_secs: AtomicU64,
|
||||||
pub(super) me_pool_min_fresh_ratio_permille: AtomicU32,
|
pub(super) me_pool_min_fresh_ratio_permille: AtomicU32,
|
||||||
|
|
@ -153,6 +195,12 @@ impl MePool {
|
||||||
me_reconnect_backoff_base_ms: u64,
|
me_reconnect_backoff_base_ms: u64,
|
||||||
me_reconnect_backoff_cap_ms: u64,
|
me_reconnect_backoff_cap_ms: u64,
|
||||||
me_reconnect_fast_retry_count: u32,
|
me_reconnect_fast_retry_count: u32,
|
||||||
|
me_single_endpoint_shadow_writers: u8,
|
||||||
|
me_single_endpoint_outage_mode_enabled: bool,
|
||||||
|
me_single_endpoint_outage_disable_quarantine: bool,
|
||||||
|
me_single_endpoint_outage_backoff_min_ms: u64,
|
||||||
|
me_single_endpoint_outage_backoff_max_ms: u64,
|
||||||
|
me_single_endpoint_shadow_rotate_every_secs: u64,
|
||||||
hardswap: bool,
|
hardswap: bool,
|
||||||
me_pool_drain_ttl_secs: u64,
|
me_pool_drain_ttl_secs: u64,
|
||||||
me_pool_force_close_secs: u64,
|
me_pool_force_close_secs: u64,
|
||||||
|
|
@ -223,6 +271,22 @@ impl MePool {
|
||||||
me_reconnect_backoff_base: Duration::from_millis(me_reconnect_backoff_base_ms),
|
me_reconnect_backoff_base: Duration::from_millis(me_reconnect_backoff_base_ms),
|
||||||
me_reconnect_backoff_cap: Duration::from_millis(me_reconnect_backoff_cap_ms),
|
me_reconnect_backoff_cap: Duration::from_millis(me_reconnect_backoff_cap_ms),
|
||||||
me_reconnect_fast_retry_count,
|
me_reconnect_fast_retry_count,
|
||||||
|
me_single_endpoint_shadow_writers: AtomicU8::new(me_single_endpoint_shadow_writers),
|
||||||
|
me_single_endpoint_outage_mode_enabled: AtomicBool::new(
|
||||||
|
me_single_endpoint_outage_mode_enabled,
|
||||||
|
),
|
||||||
|
me_single_endpoint_outage_disable_quarantine: AtomicBool::new(
|
||||||
|
me_single_endpoint_outage_disable_quarantine,
|
||||||
|
),
|
||||||
|
me_single_endpoint_outage_backoff_min_ms: AtomicU64::new(
|
||||||
|
me_single_endpoint_outage_backoff_min_ms,
|
||||||
|
),
|
||||||
|
me_single_endpoint_outage_backoff_max_ms: AtomicU64::new(
|
||||||
|
me_single_endpoint_outage_backoff_max_ms,
|
||||||
|
),
|
||||||
|
me_single_endpoint_shadow_rotate_every_secs: AtomicU64::new(
|
||||||
|
me_single_endpoint_shadow_rotate_every_secs,
|
||||||
|
),
|
||||||
pool_size: 2,
|
pool_size: 2,
|
||||||
proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)),
|
proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)),
|
||||||
proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)),
|
proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)),
|
||||||
|
|
@ -233,11 +297,17 @@ impl MePool {
|
||||||
nat_reflection_cache: Arc::new(Mutex::new(NatReflectionCache::default())),
|
nat_reflection_cache: Arc::new(Mutex::new(NatReflectionCache::default())),
|
||||||
writer_available: Arc::new(Notify::new()),
|
writer_available: Arc::new(Notify::new()),
|
||||||
refill_inflight: Arc::new(Mutex::new(HashSet::new())),
|
refill_inflight: Arc::new(Mutex::new(HashSet::new())),
|
||||||
|
refill_inflight_dc: Arc::new(Mutex::new(HashSet::new())),
|
||||||
conn_count: AtomicUsize::new(0),
|
conn_count: AtomicUsize::new(0),
|
||||||
generation: AtomicU64::new(1),
|
generation: AtomicU64::new(1),
|
||||||
|
active_generation: AtomicU64::new(1),
|
||||||
|
warm_generation: AtomicU64::new(0),
|
||||||
pending_hardswap_generation: AtomicU64::new(0),
|
pending_hardswap_generation: AtomicU64::new(0),
|
||||||
|
pending_hardswap_started_at_epoch_secs: AtomicU64::new(0),
|
||||||
|
pending_hardswap_map_hash: AtomicU64::new(0),
|
||||||
hardswap: AtomicBool::new(hardswap),
|
hardswap: AtomicBool::new(hardswap),
|
||||||
endpoint_quarantine: Arc::new(Mutex::new(HashMap::new())),
|
endpoint_quarantine: Arc::new(Mutex::new(HashMap::new())),
|
||||||
|
kdf_material_fingerprint: Arc::new(Mutex::new(HashMap::new())),
|
||||||
me_pool_drain_ttl_secs: AtomicU64::new(me_pool_drain_ttl_secs),
|
me_pool_drain_ttl_secs: AtomicU64::new(me_pool_drain_ttl_secs),
|
||||||
me_pool_force_close_secs: AtomicU64::new(me_pool_force_close_secs),
|
me_pool_force_close_secs: AtomicU64::new(me_pool_force_close_secs),
|
||||||
me_pool_min_fresh_ratio_permille: AtomicU32::new(Self::ratio_to_permille(
|
me_pool_min_fresh_ratio_permille: AtomicU32::new(Self::ratio_to_permille(
|
||||||
|
|
@ -258,7 +328,7 @@ impl MePool {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn current_generation(&self) -> u64 {
|
pub fn current_generation(&self) -> u64 {
|
||||||
self.generation.load(Ordering::Relaxed)
|
self.active_generation.load(Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn update_runtime_reinit_policy(
|
pub fn update_runtime_reinit_policy(
|
||||||
|
|
@ -275,6 +345,12 @@ impl MePool {
|
||||||
bind_stale_ttl_secs: u64,
|
bind_stale_ttl_secs: u64,
|
||||||
secret_atomic_snapshot: bool,
|
secret_atomic_snapshot: bool,
|
||||||
deterministic_writer_sort: bool,
|
deterministic_writer_sort: bool,
|
||||||
|
single_endpoint_shadow_writers: u8,
|
||||||
|
single_endpoint_outage_mode_enabled: bool,
|
||||||
|
single_endpoint_outage_disable_quarantine: bool,
|
||||||
|
single_endpoint_outage_backoff_min_ms: u64,
|
||||||
|
single_endpoint_outage_backoff_max_ms: u64,
|
||||||
|
single_endpoint_shadow_rotate_every_secs: u64,
|
||||||
) {
|
) {
|
||||||
self.hardswap.store(hardswap, Ordering::Relaxed);
|
self.hardswap.store(hardswap, Ordering::Relaxed);
|
||||||
self.me_pool_drain_ttl_secs
|
self.me_pool_drain_ttl_secs
|
||||||
|
|
@ -299,6 +375,18 @@ impl MePool {
|
||||||
.store(secret_atomic_snapshot, Ordering::Relaxed);
|
.store(secret_atomic_snapshot, Ordering::Relaxed);
|
||||||
self.me_deterministic_writer_sort
|
self.me_deterministic_writer_sort
|
||||||
.store(deterministic_writer_sort, Ordering::Relaxed);
|
.store(deterministic_writer_sort, Ordering::Relaxed);
|
||||||
|
self.me_single_endpoint_shadow_writers
|
||||||
|
.store(single_endpoint_shadow_writers, Ordering::Relaxed);
|
||||||
|
self.me_single_endpoint_outage_mode_enabled
|
||||||
|
.store(single_endpoint_outage_mode_enabled, Ordering::Relaxed);
|
||||||
|
self.me_single_endpoint_outage_disable_quarantine
|
||||||
|
.store(single_endpoint_outage_disable_quarantine, Ordering::Relaxed);
|
||||||
|
self.me_single_endpoint_outage_backoff_min_ms
|
||||||
|
.store(single_endpoint_outage_backoff_min_ms, Ordering::Relaxed);
|
||||||
|
self.me_single_endpoint_outage_backoff_max_ms
|
||||||
|
.store(single_endpoint_outage_backoff_max_ms, Ordering::Relaxed);
|
||||||
|
self.me_single_endpoint_shadow_rotate_every_secs
|
||||||
|
.store(single_endpoint_shadow_rotate_every_secs, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn reset_stun_state(&self) {
|
pub fn reset_stun_state(&self) {
|
||||||
|
|
@ -363,6 +451,54 @@ impl MePool {
|
||||||
MeBindStaleMode::from_u8(self.me_bind_stale_mode.load(Ordering::Relaxed))
|
MeBindStaleMode::from_u8(self.me_bind_stale_mode.load(Ordering::Relaxed))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(super) fn required_writers_for_dc(&self, endpoint_count: usize) -> usize {
|
||||||
|
if endpoint_count == 0 {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
if endpoint_count == 1 {
|
||||||
|
let shadow = self
|
||||||
|
.me_single_endpoint_shadow_writers
|
||||||
|
.load(Ordering::Relaxed) as usize;
|
||||||
|
return (1 + shadow).max(3);
|
||||||
|
}
|
||||||
|
endpoint_count.max(3)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn single_endpoint_outage_mode_enabled(&self) -> bool {
|
||||||
|
self.me_single_endpoint_outage_mode_enabled
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn single_endpoint_outage_disable_quarantine(&self) -> bool {
|
||||||
|
self.me_single_endpoint_outage_disable_quarantine
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn single_endpoint_outage_backoff_bounds_ms(&self) -> (u64, u64) {
|
||||||
|
let min_ms = self
|
||||||
|
.me_single_endpoint_outage_backoff_min_ms
|
||||||
|
.load(Ordering::Relaxed);
|
||||||
|
let max_ms = self
|
||||||
|
.me_single_endpoint_outage_backoff_max_ms
|
||||||
|
.load(Ordering::Relaxed);
|
||||||
|
if min_ms <= max_ms {
|
||||||
|
(min_ms, max_ms)
|
||||||
|
} else {
|
||||||
|
(max_ms, min_ms)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn single_endpoint_shadow_rotate_interval(&self) -> Option<Duration> {
|
||||||
|
let secs = self
|
||||||
|
.me_single_endpoint_shadow_rotate_every_secs
|
||||||
|
.load(Ordering::Relaxed);
|
||||||
|
if secs == 0 {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(Duration::from_secs(secs))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub(super) fn family_order(&self) -> Vec<IpFamily> {
|
pub(super) fn family_order(&self) -> Vec<IpFamily> {
|
||||||
let mut order = Vec::new();
|
let mut order = Vec::new();
|
||||||
if self.decision.prefer_ipv6() {
|
if self.decision.prefer_ipv6() {
|
||||||
|
|
|
||||||
|
|
@ -7,8 +7,9 @@ use std::time::{Duration, Instant};
|
||||||
use tracing::{debug, info, warn};
|
use tracing::{debug, info, warn};
|
||||||
|
|
||||||
use crate::crypto::SecureRandom;
|
use crate::crypto::SecureRandom;
|
||||||
|
use crate::network::IpFamily;
|
||||||
|
|
||||||
use super::pool::MePool;
|
use super::pool::{MePool, RefillDcKey, WriterContour};
|
||||||
|
|
||||||
const ME_FLAP_UPTIME_THRESHOLD_SECS: u64 = 20;
|
const ME_FLAP_UPTIME_THRESHOLD_SECS: u64 = 20;
|
||||||
const ME_FLAP_QUARANTINE_SECS: u64 = 25;
|
const ME_FLAP_QUARANTINE_SECS: u64 = 25;
|
||||||
|
|
@ -27,6 +28,7 @@ impl MePool {
|
||||||
let mut guard = self.endpoint_quarantine.lock().await;
|
let mut guard = self.endpoint_quarantine.lock().await;
|
||||||
guard.retain(|_, expiry| *expiry > Instant::now());
|
guard.retain(|_, expiry| *expiry > Instant::now());
|
||||||
guard.insert(addr, until);
|
guard.insert(addr, until);
|
||||||
|
self.stats.increment_me_endpoint_quarantine_total();
|
||||||
warn!(
|
warn!(
|
||||||
%addr,
|
%addr,
|
||||||
uptime_ms = uptime.as_millis(),
|
uptime_ms = uptime.as_millis(),
|
||||||
|
|
@ -84,14 +86,76 @@ impl MePool {
|
||||||
if endpoints.is_empty() {
|
if endpoints.is_empty() {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{
|
||||||
let guard = self.refill_inflight.lock().await;
|
let guard = self.refill_inflight.lock().await;
|
||||||
endpoints.iter().any(|addr| guard.contains(addr))
|
if endpoints.iter().any(|addr| guard.contains(addr)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let dc_keys = self.resolve_refill_dc_keys_for_endpoints(endpoints).await;
|
||||||
|
if dc_keys.is_empty() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
let guard = self.refill_inflight_dc.lock().await;
|
||||||
|
dc_keys.iter().any(|key| guard.contains(key))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn resolve_refill_dc_key_for_addr(&self, addr: SocketAddr) -> Option<RefillDcKey> {
|
||||||
|
let family = if addr.is_ipv4() {
|
||||||
|
IpFamily::V4
|
||||||
|
} else {
|
||||||
|
IpFamily::V6
|
||||||
|
};
|
||||||
|
let map = self.proxy_map_for_family(family).await;
|
||||||
|
for (dc, endpoints) in map {
|
||||||
|
if endpoints
|
||||||
|
.into_iter()
|
||||||
|
.any(|(ip, port)| SocketAddr::new(ip, port) == addr)
|
||||||
|
{
|
||||||
|
return Some(RefillDcKey {
|
||||||
|
dc: dc.abs(),
|
||||||
|
family,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn resolve_refill_dc_keys_for_endpoints(
|
||||||
|
&self,
|
||||||
|
endpoints: &[SocketAddr],
|
||||||
|
) -> HashSet<RefillDcKey> {
|
||||||
|
let mut out = HashSet::<RefillDcKey>::new();
|
||||||
|
for addr in endpoints {
|
||||||
|
if let Some(key) = self.resolve_refill_dc_key_for_addr(*addr).await {
|
||||||
|
out.insert(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
out
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) async fn connect_endpoints_round_robin(
|
pub(super) async fn connect_endpoints_round_robin(
|
||||||
self: &Arc<Self>,
|
self: &Arc<Self>,
|
||||||
endpoints: &[SocketAddr],
|
endpoints: &[SocketAddr],
|
||||||
rng: &SecureRandom,
|
rng: &SecureRandom,
|
||||||
|
) -> bool {
|
||||||
|
self.connect_endpoints_round_robin_with_generation_contour(
|
||||||
|
endpoints,
|
||||||
|
rng,
|
||||||
|
self.current_generation(),
|
||||||
|
WriterContour::Active,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) async fn connect_endpoints_round_robin_with_generation_contour(
|
||||||
|
self: &Arc<Self>,
|
||||||
|
endpoints: &[SocketAddr],
|
||||||
|
rng: &SecureRandom,
|
||||||
|
generation: u64,
|
||||||
|
contour: WriterContour,
|
||||||
) -> bool {
|
) -> bool {
|
||||||
let candidates = self.connectable_endpoints(endpoints).await;
|
let candidates = self.connectable_endpoints(endpoints).await;
|
||||||
if candidates.is_empty() {
|
if candidates.is_empty() {
|
||||||
|
|
@ -101,7 +165,10 @@ impl MePool {
|
||||||
for offset in 0..candidates.len() {
|
for offset in 0..candidates.len() {
|
||||||
let idx = (start + offset) % candidates.len();
|
let idx = (start + offset) % candidates.len();
|
||||||
let addr = candidates[idx];
|
let addr = candidates[idx];
|
||||||
match self.connect_one(addr, rng).await {
|
match self
|
||||||
|
.connect_one_with_generation_contour(addr, rng, generation, contour)
|
||||||
|
.await
|
||||||
|
{
|
||||||
Ok(()) => return true,
|
Ok(()) => return true,
|
||||||
Err(e) => debug!(%addr, error = %e, "ME connect failed during round-robin warmup"),
|
Err(e) => debug!(%addr, error = %e, "ME connect failed during round-robin warmup"),
|
||||||
}
|
}
|
||||||
|
|
@ -225,6 +292,9 @@ impl MePool {
|
||||||
pub(crate) fn trigger_immediate_refill(self: &Arc<Self>, addr: SocketAddr) {
|
pub(crate) fn trigger_immediate_refill(self: &Arc<Self>, addr: SocketAddr) {
|
||||||
let pool = Arc::clone(self);
|
let pool = Arc::clone(self);
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
let dc_endpoints = pool.endpoints_for_same_dc(addr).await;
|
||||||
|
let dc_keys = pool.resolve_refill_dc_keys_for_endpoints(&dc_endpoints).await;
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut guard = pool.refill_inflight.lock().await;
|
let mut guard = pool.refill_inflight.lock().await;
|
||||||
if !guard.insert(addr) {
|
if !guard.insert(addr) {
|
||||||
|
|
@ -232,6 +302,19 @@ impl MePool {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !dc_keys.is_empty() {
|
||||||
|
let mut dc_guard = pool.refill_inflight_dc.lock().await;
|
||||||
|
if dc_keys.iter().any(|key| dc_guard.contains(key)) {
|
||||||
|
pool.stats.increment_me_refill_skipped_inflight_total();
|
||||||
|
drop(dc_guard);
|
||||||
|
let mut guard = pool.refill_inflight.lock().await;
|
||||||
|
guard.remove(&addr);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
dc_guard.extend(dc_keys.iter().copied());
|
||||||
|
}
|
||||||
|
|
||||||
pool.stats.increment_me_refill_triggered_total();
|
pool.stats.increment_me_refill_triggered_total();
|
||||||
|
|
||||||
let restored = pool.refill_writer_after_loss(addr).await;
|
let restored = pool.refill_writer_after_loss(addr).await;
|
||||||
|
|
@ -241,6 +324,13 @@ impl MePool {
|
||||||
|
|
||||||
let mut guard = pool.refill_inflight.lock().await;
|
let mut guard = pool.refill_inflight.lock().await;
|
||||||
guard.remove(&addr);
|
guard.remove(&addr);
|
||||||
|
drop(guard);
|
||||||
|
if !dc_keys.is_empty() {
|
||||||
|
let mut dc_guard = pool.refill_inflight_dc.lock().await;
|
||||||
|
for key in &dc_keys {
|
||||||
|
dc_guard.remove(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,5 @@
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
|
use std::hash::{Hash, Hasher};
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::Ordering;
|
||||||
|
|
@ -7,12 +8,58 @@ use std::time::Duration;
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
use rand::seq::SliceRandom;
|
use rand::seq::SliceRandom;
|
||||||
use tracing::{debug, info, warn};
|
use tracing::{debug, info, warn};
|
||||||
|
use std::collections::hash_map::DefaultHasher;
|
||||||
|
|
||||||
use crate::crypto::SecureRandom;
|
use crate::crypto::SecureRandom;
|
||||||
|
|
||||||
use super::pool::MePool;
|
use super::pool::{MePool, WriterContour};
|
||||||
|
|
||||||
|
const ME_HARDSWAP_PENDING_TTL_SECS: u64 = 1800;
|
||||||
|
|
||||||
impl MePool {
|
impl MePool {
|
||||||
|
fn desired_map_hash(desired_by_dc: &HashMap<i32, HashSet<SocketAddr>>) -> u64 {
|
||||||
|
let mut hasher = DefaultHasher::new();
|
||||||
|
let mut dcs: Vec<i32> = desired_by_dc.keys().copied().collect();
|
||||||
|
dcs.sort_unstable();
|
||||||
|
for dc in dcs {
|
||||||
|
dc.hash(&mut hasher);
|
||||||
|
let mut endpoints: Vec<SocketAddr> = desired_by_dc
|
||||||
|
.get(&dc)
|
||||||
|
.map(|set| set.iter().copied().collect())
|
||||||
|
.unwrap_or_default();
|
||||||
|
endpoints.sort_unstable();
|
||||||
|
for endpoint in endpoints {
|
||||||
|
endpoint.hash(&mut hasher);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
hasher.finish()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn clear_pending_hardswap_state(&self) {
|
||||||
|
self.pending_hardswap_generation.store(0, Ordering::Relaxed);
|
||||||
|
self.pending_hardswap_started_at_epoch_secs
|
||||||
|
.store(0, Ordering::Relaxed);
|
||||||
|
self.pending_hardswap_map_hash.store(0, Ordering::Relaxed);
|
||||||
|
self.warm_generation.store(0, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn promote_warm_generation_to_active(&self, generation: u64) {
|
||||||
|
self.active_generation.store(generation, Ordering::Relaxed);
|
||||||
|
self.warm_generation.store(0, Ordering::Relaxed);
|
||||||
|
|
||||||
|
let ws = self.writers.read().await;
|
||||||
|
for writer in ws.iter() {
|
||||||
|
if writer.draining.load(Ordering::Relaxed) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if writer.generation == generation {
|
||||||
|
writer
|
||||||
|
.contour
|
||||||
|
.store(WriterContour::Active.as_u8(), Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn coverage_ratio(
|
fn coverage_ratio(
|
||||||
desired_by_dc: &HashMap<i32, HashSet<SocketAddr>>,
|
desired_by_dc: &HashMap<i32, HashSet<SocketAddr>>,
|
||||||
active_writer_addrs: &HashSet<SocketAddr>,
|
active_writer_addrs: &HashSet<SocketAddr>,
|
||||||
|
|
@ -101,10 +148,6 @@ impl MePool {
|
||||||
out
|
out
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn required_writers_for_dc(endpoint_count: usize) -> usize {
|
|
||||||
endpoint_count.max(3)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn hardswap_warmup_connect_delay_ms(&self) -> u64 {
|
fn hardswap_warmup_connect_delay_ms(&self) -> u64 {
|
||||||
let min_ms = self.me_hardswap_warmup_delay_min_ms.load(Ordering::Relaxed);
|
let min_ms = self.me_hardswap_warmup_delay_min_ms.load(Ordering::Relaxed);
|
||||||
let max_ms = self.me_hardswap_warmup_delay_max_ms.load(Ordering::Relaxed);
|
let max_ms = self.me_hardswap_warmup_delay_max_ms.load(Ordering::Relaxed);
|
||||||
|
|
@ -174,7 +217,7 @@ impl MePool {
|
||||||
|
|
||||||
let mut endpoint_list: Vec<SocketAddr> = endpoints.iter().copied().collect();
|
let mut endpoint_list: Vec<SocketAddr> = endpoints.iter().copied().collect();
|
||||||
endpoint_list.sort_unstable();
|
endpoint_list.sort_unstable();
|
||||||
let required = Self::required_writers_for_dc(endpoint_list.len());
|
let required = self.required_writers_for_dc(endpoint_list.len());
|
||||||
let mut completed = false;
|
let mut completed = false;
|
||||||
let mut last_fresh_count = self
|
let mut last_fresh_count = self
|
||||||
.fresh_writer_count_for_endpoints(generation, endpoints)
|
.fresh_writer_count_for_endpoints(generation, endpoints)
|
||||||
|
|
@ -202,7 +245,14 @@ impl MePool {
|
||||||
let delay_ms = self.hardswap_warmup_connect_delay_ms();
|
let delay_ms = self.hardswap_warmup_connect_delay_ms();
|
||||||
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
|
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
|
||||||
|
|
||||||
let connected = self.connect_endpoints_round_robin(&endpoint_list, rng).await;
|
let connected = self
|
||||||
|
.connect_endpoints_round_robin_with_generation_contour(
|
||||||
|
&endpoint_list,
|
||||||
|
rng,
|
||||||
|
generation,
|
||||||
|
WriterContour::Warm,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
debug!(
|
debug!(
|
||||||
dc = *dc,
|
dc = *dc,
|
||||||
pass = pass_idx + 1,
|
pass = pass_idx + 1,
|
||||||
|
|
@ -265,29 +315,61 @@ impl MePool {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let desired_map_hash = Self::desired_map_hash(&desired_by_dc);
|
||||||
|
let now_epoch_secs = Self::now_epoch_secs();
|
||||||
let previous_generation = self.current_generation();
|
let previous_generation = self.current_generation();
|
||||||
let hardswap = self.hardswap.load(Ordering::Relaxed);
|
let hardswap = self.hardswap.load(Ordering::Relaxed);
|
||||||
let generation = if hardswap {
|
let generation = if hardswap {
|
||||||
let pending_generation = self.pending_hardswap_generation.load(Ordering::Relaxed);
|
let pending_generation = self.pending_hardswap_generation.load(Ordering::Relaxed);
|
||||||
if pending_generation != 0 && pending_generation >= previous_generation {
|
let pending_started_at = self
|
||||||
|
.pending_hardswap_started_at_epoch_secs
|
||||||
|
.load(Ordering::Relaxed);
|
||||||
|
let pending_map_hash = self.pending_hardswap_map_hash.load(Ordering::Relaxed);
|
||||||
|
let pending_age_secs = now_epoch_secs.saturating_sub(pending_started_at);
|
||||||
|
let pending_ttl_expired = pending_started_at > 0 && pending_age_secs > ME_HARDSWAP_PENDING_TTL_SECS;
|
||||||
|
let pending_matches_map = pending_map_hash != 0 && pending_map_hash == desired_map_hash;
|
||||||
|
|
||||||
|
if pending_generation != 0
|
||||||
|
&& pending_generation >= previous_generation
|
||||||
|
&& pending_matches_map
|
||||||
|
&& !pending_ttl_expired
|
||||||
|
{
|
||||||
|
self.stats.increment_me_hardswap_pending_reuse_total();
|
||||||
debug!(
|
debug!(
|
||||||
previous_generation,
|
previous_generation,
|
||||||
generation = pending_generation,
|
generation = pending_generation,
|
||||||
|
pending_age_secs,
|
||||||
"ME hardswap continues with pending generation"
|
"ME hardswap continues with pending generation"
|
||||||
);
|
);
|
||||||
pending_generation
|
pending_generation
|
||||||
} else {
|
} else {
|
||||||
|
if pending_generation != 0 && pending_ttl_expired {
|
||||||
|
self.stats.increment_me_hardswap_pending_ttl_expired_total();
|
||||||
|
warn!(
|
||||||
|
previous_generation,
|
||||||
|
generation = pending_generation,
|
||||||
|
pending_age_secs,
|
||||||
|
pending_ttl_secs = ME_HARDSWAP_PENDING_TTL_SECS,
|
||||||
|
"ME hardswap pending generation expired by TTL; starting fresh generation"
|
||||||
|
);
|
||||||
|
}
|
||||||
let next_generation = self.generation.fetch_add(1, Ordering::Relaxed) + 1;
|
let next_generation = self.generation.fetch_add(1, Ordering::Relaxed) + 1;
|
||||||
self.pending_hardswap_generation
|
self.pending_hardswap_generation
|
||||||
.store(next_generation, Ordering::Relaxed);
|
.store(next_generation, Ordering::Relaxed);
|
||||||
|
self.pending_hardswap_started_at_epoch_secs
|
||||||
|
.store(now_epoch_secs, Ordering::Relaxed);
|
||||||
|
self.pending_hardswap_map_hash
|
||||||
|
.store(desired_map_hash, Ordering::Relaxed);
|
||||||
|
self.warm_generation.store(next_generation, Ordering::Relaxed);
|
||||||
next_generation
|
next_generation
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
self.pending_hardswap_generation.store(0, Ordering::Relaxed);
|
self.clear_pending_hardswap_state();
|
||||||
self.generation.fetch_add(1, Ordering::Relaxed) + 1
|
self.generation.fetch_add(1, Ordering::Relaxed) + 1
|
||||||
};
|
};
|
||||||
|
|
||||||
if hardswap {
|
if hardswap {
|
||||||
|
self.warm_generation.store(generation, Ordering::Relaxed);
|
||||||
self.warmup_generation_for_all_dcs(rng, generation, &desired_by_dc)
|
self.warmup_generation_for_all_dcs(rng, generation, &desired_by_dc)
|
||||||
.await;
|
.await;
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -323,7 +405,7 @@ impl MePool {
|
||||||
if endpoints.is_empty() {
|
if endpoints.is_empty() {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let required = Self::required_writers_for_dc(endpoints.len());
|
let required = self.required_writers_for_dc(endpoints.len());
|
||||||
let fresh_count = writers
|
let fresh_count = writers
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|w| !w.draining.load(Ordering::Relaxed))
|
.filter(|w| !w.draining.load(Ordering::Relaxed))
|
||||||
|
|
@ -352,6 +434,10 @@ impl MePool {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if hardswap {
|
||||||
|
self.promote_warm_generation_to_active(generation).await;
|
||||||
|
}
|
||||||
|
|
||||||
let desired_addrs: HashSet<SocketAddr> = desired_by_dc
|
let desired_addrs: HashSet<SocketAddr> = desired_by_dc
|
||||||
.values()
|
.values()
|
||||||
.flat_map(|set| set.iter().copied())
|
.flat_map(|set| set.iter().copied())
|
||||||
|
|
@ -373,7 +459,7 @@ impl MePool {
|
||||||
|
|
||||||
if stale_writer_ids.is_empty() {
|
if stale_writer_ids.is_empty() {
|
||||||
if hardswap {
|
if hardswap {
|
||||||
self.pending_hardswap_generation.store(0, Ordering::Relaxed);
|
self.clear_pending_hardswap_state();
|
||||||
}
|
}
|
||||||
debug!("ME reinit cycle completed with no stale writers");
|
debug!("ME reinit cycle completed with no stale writers");
|
||||||
return;
|
return;
|
||||||
|
|
@ -397,7 +483,7 @@ impl MePool {
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
if hardswap {
|
if hardswap {
|
||||||
self.pending_hardswap_generation.store(0, Ordering::Relaxed);
|
self.clear_pending_hardswap_state();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use bytes::BytesMut;
|
use bytes::BytesMut;
|
||||||
|
|
@ -15,7 +15,7 @@ use crate::error::{ProxyError, Result};
|
||||||
use crate::protocol::constants::RPC_PING_U32;
|
use crate::protocol::constants::RPC_PING_U32;
|
||||||
|
|
||||||
use super::codec::{RpcWriter, WriterCommand};
|
use super::codec::{RpcWriter, WriterCommand};
|
||||||
use super::pool::{MePool, MeWriter};
|
use super::pool::{MePool, MeWriter, WriterContour};
|
||||||
use super::reader::reader_loop;
|
use super::reader::reader_loop;
|
||||||
use super::registry::BoundConn;
|
use super::registry::BoundConn;
|
||||||
|
|
||||||
|
|
@ -43,6 +43,22 @@ impl MePool {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn connect_one(self: &Arc<Self>, addr: SocketAddr, rng: &SecureRandom) -> Result<()> {
|
pub(crate) async fn connect_one(self: &Arc<Self>, addr: SocketAddr, rng: &SecureRandom) -> Result<()> {
|
||||||
|
self.connect_one_with_generation_contour(
|
||||||
|
addr,
|
||||||
|
rng,
|
||||||
|
self.current_generation(),
|
||||||
|
WriterContour::Active,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) async fn connect_one_with_generation_contour(
|
||||||
|
self: &Arc<Self>,
|
||||||
|
addr: SocketAddr,
|
||||||
|
rng: &SecureRandom,
|
||||||
|
generation: u64,
|
||||||
|
contour: WriterContour,
|
||||||
|
) -> Result<()> {
|
||||||
let secret_len = self.proxy_secret.read().await.secret.len();
|
let secret_len = self.proxy_secret.read().await.secret.len();
|
||||||
if secret_len < 32 {
|
if secret_len < 32 {
|
||||||
return Err(ProxyError::Proxy("proxy-secret too short for ME auth".into()));
|
return Err(ProxyError::Proxy("proxy-secret too short for ME auth".into()));
|
||||||
|
|
@ -52,7 +68,7 @@ impl MePool {
|
||||||
let hs = self.handshake_only(stream, addr, upstream_egress, rng).await?;
|
let hs = self.handshake_only(stream, addr, upstream_egress, rng).await?;
|
||||||
|
|
||||||
let writer_id = self.next_writer_id.fetch_add(1, Ordering::Relaxed);
|
let writer_id = self.next_writer_id.fetch_add(1, Ordering::Relaxed);
|
||||||
let generation = self.current_generation();
|
let contour = Arc::new(AtomicU8::new(contour.as_u8()));
|
||||||
let cancel = CancellationToken::new();
|
let cancel = CancellationToken::new();
|
||||||
let degraded = Arc::new(AtomicBool::new(false));
|
let degraded = Arc::new(AtomicBool::new(false));
|
||||||
let draining = Arc::new(AtomicBool::new(false));
|
let draining = Arc::new(AtomicBool::new(false));
|
||||||
|
|
@ -89,6 +105,7 @@ impl MePool {
|
||||||
id: writer_id,
|
id: writer_id,
|
||||||
addr,
|
addr,
|
||||||
generation,
|
generation,
|
||||||
|
contour: contour.clone(),
|
||||||
created_at: Instant::now(),
|
created_at: Instant::now(),
|
||||||
tx: tx.clone(),
|
tx: tx.clone(),
|
||||||
cancel: cancel.clone(),
|
cancel: cancel.clone(),
|
||||||
|
|
@ -305,6 +322,8 @@ impl MePool {
|
||||||
if !already_draining {
|
if !already_draining {
|
||||||
self.stats.increment_pool_drain_active();
|
self.stats.increment_pool_drain_active();
|
||||||
}
|
}
|
||||||
|
w.contour
|
||||||
|
.store(WriterContour::Draining.as_u8(), Ordering::Relaxed);
|
||||||
w.draining.store(true, Ordering::Relaxed);
|
w.draining.store(true, Ordering::Relaxed);
|
||||||
true
|
true
|
||||||
} else {
|
} else {
|
||||||
|
|
|
||||||
|
|
@ -46,6 +46,7 @@ pub(crate) async fn reader_loop(
|
||||||
_ = cancel.cancelled() => return Ok(()),
|
_ = cancel.cancelled() => return Ok(()),
|
||||||
};
|
};
|
||||||
if n == 0 {
|
if n == 0 {
|
||||||
|
stats.increment_me_reader_eof_total();
|
||||||
return Err(ProxyError::Io(std::io::Error::new(
|
return Err(ProxyError::Io(std::io::Error::new(
|
||||||
ErrorKind::UnexpectedEof,
|
ErrorKind::UnexpectedEof,
|
||||||
"ME socket closed by peer",
|
"ME socket closed by peer",
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,7 @@ use crate::protocol::constants::RPC_CLOSE_EXT_U32;
|
||||||
|
|
||||||
use super::MePool;
|
use super::MePool;
|
||||||
use super::codec::WriterCommand;
|
use super::codec::WriterCommand;
|
||||||
|
use super::pool::WriterContour;
|
||||||
use super::wire::build_proxy_req_payload;
|
use super::wire::build_proxy_req_payload;
|
||||||
use rand::seq::SliceRandom;
|
use rand::seq::SliceRandom;
|
||||||
use super::registry::ConnMeta;
|
use super::registry::ConnMeta;
|
||||||
|
|
@ -101,7 +102,14 @@ impl MePool {
|
||||||
ws.clone()
|
ws.clone()
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut candidate_indices = self.candidate_indices_for_dc(&writers_snapshot, target_dc).await;
|
let mut candidate_indices = self
|
||||||
|
.candidate_indices_for_dc(&writers_snapshot, target_dc, false)
|
||||||
|
.await;
|
||||||
|
if candidate_indices.is_empty() {
|
||||||
|
candidate_indices = self
|
||||||
|
.candidate_indices_for_dc(&writers_snapshot, target_dc, true)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
if candidate_indices.is_empty() {
|
if candidate_indices.is_empty() {
|
||||||
// Emergency connect-on-demand
|
// Emergency connect-on-demand
|
||||||
if emergency_attempts >= 3 {
|
if emergency_attempts >= 3 {
|
||||||
|
|
@ -127,7 +135,14 @@ impl MePool {
|
||||||
let ws2 = self.writers.read().await;
|
let ws2 = self.writers.read().await;
|
||||||
writers_snapshot = ws2.clone();
|
writers_snapshot = ws2.clone();
|
||||||
drop(ws2);
|
drop(ws2);
|
||||||
candidate_indices = self.candidate_indices_for_dc(&writers_snapshot, target_dc).await;
|
candidate_indices = self
|
||||||
|
.candidate_indices_for_dc(&writers_snapshot, target_dc, false)
|
||||||
|
.await;
|
||||||
|
if candidate_indices.is_empty() {
|
||||||
|
candidate_indices = self
|
||||||
|
.candidate_indices_for_dc(&writers_snapshot, target_dc, true)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
if !candidate_indices.is_empty() {
|
if !candidate_indices.is_empty() {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
@ -143,6 +158,7 @@ impl MePool {
|
||||||
let left = &writers_snapshot[*lhs];
|
let left = &writers_snapshot[*lhs];
|
||||||
let right = &writers_snapshot[*rhs];
|
let right = &writers_snapshot[*rhs];
|
||||||
let left_key = (
|
let left_key = (
|
||||||
|
self.writer_contour_rank_for_selection(left),
|
||||||
(left.generation < self.current_generation()) as usize,
|
(left.generation < self.current_generation()) as usize,
|
||||||
left.degraded.load(Ordering::Relaxed) as usize,
|
left.degraded.load(Ordering::Relaxed) as usize,
|
||||||
Reverse(left.tx.capacity()),
|
Reverse(left.tx.capacity()),
|
||||||
|
|
@ -150,6 +166,7 @@ impl MePool {
|
||||||
left.id,
|
left.id,
|
||||||
);
|
);
|
||||||
let right_key = (
|
let right_key = (
|
||||||
|
self.writer_contour_rank_for_selection(right),
|
||||||
(right.generation < self.current_generation()) as usize,
|
(right.generation < self.current_generation()) as usize,
|
||||||
right.degraded.load(Ordering::Relaxed) as usize,
|
right.degraded.load(Ordering::Relaxed) as usize,
|
||||||
Reverse(right.tx.capacity()),
|
Reverse(right.tx.capacity()),
|
||||||
|
|
@ -163,7 +180,12 @@ impl MePool {
|
||||||
let w = &writers_snapshot[*idx];
|
let w = &writers_snapshot[*idx];
|
||||||
let degraded = w.degraded.load(Ordering::Relaxed);
|
let degraded = w.degraded.load(Ordering::Relaxed);
|
||||||
let stale = (w.generation < self.current_generation()) as usize;
|
let stale = (w.generation < self.current_generation()) as usize;
|
||||||
(stale, degraded as usize, Reverse(w.tx.capacity()))
|
(
|
||||||
|
self.writer_contour_rank_for_selection(w),
|
||||||
|
stale,
|
||||||
|
degraded as usize,
|
||||||
|
Reverse(w.tx.capacity()),
|
||||||
|
)
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -257,6 +279,7 @@ impl MePool {
|
||||||
&self,
|
&self,
|
||||||
writers: &[super::pool::MeWriter],
|
writers: &[super::pool::MeWriter],
|
||||||
target_dc: i16,
|
target_dc: i16,
|
||||||
|
include_warm: bool,
|
||||||
) -> Vec<usize> {
|
) -> Vec<usize> {
|
||||||
let key = target_dc as i32;
|
let key = target_dc as i32;
|
||||||
let mut preferred = Vec::<SocketAddr>::new();
|
let mut preferred = Vec::<SocketAddr>::new();
|
||||||
|
|
@ -300,13 +323,13 @@ impl MePool {
|
||||||
|
|
||||||
if preferred.is_empty() {
|
if preferred.is_empty() {
|
||||||
return (0..writers.len())
|
return (0..writers.len())
|
||||||
.filter(|i| self.writer_accepts_new_binding(&writers[*i]))
|
.filter(|i| self.writer_eligible_for_selection(&writers[*i], include_warm))
|
||||||
.collect();
|
.collect();
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut out = Vec::new();
|
let mut out = Vec::new();
|
||||||
for (idx, w) in writers.iter().enumerate() {
|
for (idx, w) in writers.iter().enumerate() {
|
||||||
if !self.writer_accepts_new_binding(w) {
|
if !self.writer_eligible_for_selection(w, include_warm) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if preferred.contains(&w.addr) {
|
if preferred.contains(&w.addr) {
|
||||||
|
|
@ -315,10 +338,33 @@ impl MePool {
|
||||||
}
|
}
|
||||||
if out.is_empty() {
|
if out.is_empty() {
|
||||||
return (0..writers.len())
|
return (0..writers.len())
|
||||||
.filter(|i| self.writer_accepts_new_binding(&writers[*i]))
|
.filter(|i| self.writer_eligible_for_selection(&writers[*i], include_warm))
|
||||||
.collect();
|
.collect();
|
||||||
}
|
}
|
||||||
out
|
out
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn writer_eligible_for_selection(
|
||||||
|
&self,
|
||||||
|
writer: &super::pool::MeWriter,
|
||||||
|
include_warm: bool,
|
||||||
|
) -> bool {
|
||||||
|
if !self.writer_accepts_new_binding(writer) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
match WriterContour::from_u8(writer.contour.load(Ordering::Relaxed)) {
|
||||||
|
WriterContour::Active => true,
|
||||||
|
WriterContour::Warm => include_warm,
|
||||||
|
WriterContour::Draining => true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn writer_contour_rank_for_selection(&self, writer: &super::pool::MeWriter) -> usize {
|
||||||
|
match WriterContour::from_u8(writer.contour.load(Ordering::Relaxed)) {
|
||||||
|
WriterContour::Active => 0,
|
||||||
|
WriterContour::Warm => 1,
|
||||||
|
WriterContour::Draining => 2,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue