Merge latest upstream/main into test/main-into-flow-sec

This commit is contained in:
David Osipov 2026-03-20 18:00:20 +04:00
commit 79093679ab
No known key found for this signature in database
GPG Key ID: 0E55C4A47454E82E
11 changed files with 462 additions and 19 deletions

View File

@ -107,6 +107,25 @@ pub(super) struct RuntimeMeQualityRouteDropData {
pub(super) queue_full_high_total: u64, pub(super) queue_full_high_total: u64,
} }
#[derive(Serialize)]
pub(super) struct RuntimeMeQualityFamilyStateData {
pub(super) family: &'static str,
pub(super) state: &'static str,
pub(super) state_since_epoch_secs: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) suppressed_until_epoch_secs: Option<u64>,
pub(super) fail_streak: u32,
pub(super) recover_success_streak: u32,
}
#[derive(Serialize)]
pub(super) struct RuntimeMeQualityDrainGateData {
pub(super) route_quorum_ok: bool,
pub(super) redundancy_ok: bool,
pub(super) block_reason: &'static str,
pub(super) updated_at_epoch_secs: u64,
}
#[derive(Serialize)] #[derive(Serialize)]
pub(super) struct RuntimeMeQualityDcRttData { pub(super) struct RuntimeMeQualityDcRttData {
pub(super) dc: i16, pub(super) dc: i16,
@ -120,6 +139,8 @@ pub(super) struct RuntimeMeQualityDcRttData {
pub(super) struct RuntimeMeQualityPayload { pub(super) struct RuntimeMeQualityPayload {
pub(super) counters: RuntimeMeQualityCountersData, pub(super) counters: RuntimeMeQualityCountersData,
pub(super) route_drops: RuntimeMeQualityRouteDropData, pub(super) route_drops: RuntimeMeQualityRouteDropData,
pub(super) family_states: Vec<RuntimeMeQualityFamilyStateData>,
pub(super) drain_gate: RuntimeMeQualityDrainGateData,
pub(super) dc_rtt: Vec<RuntimeMeQualityDcRttData>, pub(super) dc_rtt: Vec<RuntimeMeQualityDcRttData>,
} }
@ -360,6 +381,19 @@ pub(super) async fn build_runtime_me_quality_data(shared: &ApiShared) -> Runtime
}; };
let status = pool.api_status_snapshot().await; let status = pool.api_status_snapshot().await;
let family_states = pool
.api_family_state_snapshot()
.into_iter()
.map(|entry| RuntimeMeQualityFamilyStateData {
family: entry.family,
state: entry.state,
state_since_epoch_secs: entry.state_since_epoch_secs,
suppressed_until_epoch_secs: entry.suppressed_until_epoch_secs,
fail_streak: entry.fail_streak,
recover_success_streak: entry.recover_success_streak,
})
.collect();
let drain_gate_snapshot = pool.api_drain_gate_snapshot();
RuntimeMeQualityData { RuntimeMeQualityData {
enabled: true, enabled: true,
reason: None, reason: None,
@ -380,6 +414,13 @@ pub(super) async fn build_runtime_me_quality_data(shared: &ApiShared) -> Runtime
queue_full_base_total: shared.stats.get_me_route_drop_queue_full_base(), queue_full_base_total: shared.stats.get_me_route_drop_queue_full_base(),
queue_full_high_total: shared.stats.get_me_route_drop_queue_full_high(), queue_full_high_total: shared.stats.get_me_route_drop_queue_full_high(),
}, },
family_states,
drain_gate: RuntimeMeQualityDrainGateData {
route_quorum_ok: drain_gate_snapshot.route_quorum_ok,
redundancy_ok: drain_gate_snapshot.redundancy_ok,
block_reason: drain_gate_snapshot.block_reason,
updated_at_epoch_secs: drain_gate_snapshot.updated_at_epoch_secs,
},
dc_rtt: status dc_rtt: status
.dcs .dcs
.into_iter() .into_iter()

View File

@ -65,6 +65,10 @@ pub(crate) fn default_tls_domain() -> String {
"petrovich.ru".to_string() "petrovich.ru".to_string()
} }
pub(crate) fn default_tls_fetch_scope() -> String {
String::new()
}
pub(crate) fn default_mask_port() -> u16 { pub(crate) fn default_mask_port() -> u16 {
443 443
} }

View File

@ -566,6 +566,7 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b
} }
if old.censorship.tls_domain != new.censorship.tls_domain if old.censorship.tls_domain != new.censorship.tls_domain
|| old.censorship.tls_domains != new.censorship.tls_domains || old.censorship.tls_domains != new.censorship.tls_domains
|| old.censorship.tls_fetch_scope != new.censorship.tls_fetch_scope
|| old.censorship.mask != new.censorship.mask || old.censorship.mask != new.censorship.mask
|| old.censorship.mask_host != new.censorship.mask_host || old.censorship.mask_host != new.censorship.mask_host
|| old.censorship.mask_port != new.censorship.mask_port || old.censorship.mask_port != new.censorship.mask_port

View File

@ -815,6 +815,9 @@ impl ProxyConfig {
config.censorship.mask_host = Some(config.censorship.tls_domain.clone()); config.censorship.mask_host = Some(config.censorship.tls_domain.clone());
} }
// Normalize optional TLS fetch scope: whitespace-only values disable scoped routing.
config.censorship.tls_fetch_scope = config.censorship.tls_fetch_scope.trim().to_string();
// Merge primary + extra TLS domains, deduplicate (primary always first). // Merge primary + extra TLS domains, deduplicate (primary always first).
if !config.censorship.tls_domains.is_empty() { if !config.censorship.tls_domains.is_empty() {
let mut all = Vec::with_capacity(1 + config.censorship.tls_domains.len()); let mut all = Vec::with_capacity(1 + config.censorship.tls_domains.len());
@ -2137,6 +2140,59 @@ mod tests {
let _ = std::fs::remove_file(path); let _ = std::fs::remove_file(path);
} }
#[test]
fn tls_fetch_scope_default_is_empty() {
let toml = r#"
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_tls_fetch_scope_default_test.toml");
std::fs::write(&path, toml).unwrap();
let cfg = ProxyConfig::load(&path).unwrap();
assert!(cfg.censorship.tls_fetch_scope.is_empty());
let _ = std::fs::remove_file(path);
}
#[test]
fn tls_fetch_scope_is_trimmed_during_load() {
let toml = r#"
[censorship]
tls_domain = "example.com"
tls_fetch_scope = " me "
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_tls_fetch_scope_trim_test.toml");
std::fs::write(&path, toml).unwrap();
let cfg = ProxyConfig::load(&path).unwrap();
assert_eq!(cfg.censorship.tls_fetch_scope, "me");
let _ = std::fs::remove_file(path);
}
#[test]
fn tls_fetch_scope_whitespace_becomes_empty() {
let toml = r#"
[censorship]
tls_domain = "example.com"
tls_fetch_scope = " "
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_tls_fetch_scope_blank_test.toml");
std::fs::write(&path, toml).unwrap();
let cfg = ProxyConfig::load(&path).unwrap();
assert!(cfg.censorship.tls_fetch_scope.is_empty());
let _ = std::fs::remove_file(path);
}
#[test] #[test]
fn invalid_ad_tag_is_disabled_during_load() { fn invalid_ad_tag_is_disabled_during_load() {
let toml = r#" let toml = r#"

View File

@ -1339,6 +1339,11 @@ pub struct AntiCensorshipConfig {
#[serde(default)] #[serde(default)]
pub tls_domains: Vec<String>, pub tls_domains: Vec<String>,
/// Upstream scope used for TLS front metadata fetches.
/// Empty value keeps default upstream routing behavior.
#[serde(default = "default_tls_fetch_scope")]
pub tls_fetch_scope: String,
#[serde(default = "default_true")] #[serde(default = "default_true")]
pub mask: bool, pub mask: bool,
@ -1396,6 +1401,7 @@ impl Default for AntiCensorshipConfig {
Self { Self {
tls_domain: default_tls_domain(), tls_domain: default_tls_domain(),
tls_domains: Vec::new(), tls_domains: Vec::new(),
tls_fetch_scope: default_tls_fetch_scope(),
mask: default_true(), mask: default_true(),
mask_host: None, mask_host: None,
mask_port: default_mask_port(), mask_port: default_mask_port(),

View File

@ -38,12 +38,15 @@ pub(crate) async fn bootstrap_tls_front(
.clone() .clone()
.unwrap_or_else(|| config.censorship.tls_domain.clone()); .unwrap_or_else(|| config.censorship.tls_domain.clone());
let mask_unix_sock = config.censorship.mask_unix_sock.clone(); let mask_unix_sock = config.censorship.mask_unix_sock.clone();
let tls_fetch_scope = (!config.censorship.tls_fetch_scope.is_empty())
.then(|| config.censorship.tls_fetch_scope.clone());
let fetch_timeout = Duration::from_secs(5); let fetch_timeout = Duration::from_secs(5);
let cache_initial = cache.clone(); let cache_initial = cache.clone();
let domains_initial = tls_domains.to_vec(); let domains_initial = tls_domains.to_vec();
let host_initial = mask_host.clone(); let host_initial = mask_host.clone();
let unix_sock_initial = mask_unix_sock.clone(); let unix_sock_initial = mask_unix_sock.clone();
let scope_initial = tls_fetch_scope.clone();
let upstream_initial = upstream_manager.clone(); let upstream_initial = upstream_manager.clone();
tokio::spawn(async move { tokio::spawn(async move {
let mut join = tokio::task::JoinSet::new(); let mut join = tokio::task::JoinSet::new();
@ -51,6 +54,7 @@ pub(crate) async fn bootstrap_tls_front(
let cache_domain = cache_initial.clone(); let cache_domain = cache_initial.clone();
let host_domain = host_initial.clone(); let host_domain = host_initial.clone();
let unix_sock_domain = unix_sock_initial.clone(); let unix_sock_domain = unix_sock_initial.clone();
let scope_domain = scope_initial.clone();
let upstream_domain = upstream_initial.clone(); let upstream_domain = upstream_initial.clone();
join.spawn(async move { join.spawn(async move {
match crate::tls_front::fetcher::fetch_real_tls( match crate::tls_front::fetcher::fetch_real_tls(
@ -59,6 +63,7 @@ pub(crate) async fn bootstrap_tls_front(
&domain, &domain,
fetch_timeout, fetch_timeout,
Some(upstream_domain), Some(upstream_domain),
scope_domain.as_deref(),
proxy_protocol, proxy_protocol,
unix_sock_domain.as_deref(), unix_sock_domain.as_deref(),
) )
@ -100,6 +105,7 @@ pub(crate) async fn bootstrap_tls_front(
let domains_refresh = tls_domains.to_vec(); let domains_refresh = tls_domains.to_vec();
let host_refresh = mask_host.clone(); let host_refresh = mask_host.clone();
let unix_sock_refresh = mask_unix_sock.clone(); let unix_sock_refresh = mask_unix_sock.clone();
let scope_refresh = tls_fetch_scope.clone();
let upstream_refresh = upstream_manager.clone(); let upstream_refresh = upstream_manager.clone();
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
@ -112,6 +118,7 @@ pub(crate) async fn bootstrap_tls_front(
let cache_domain = cache_refresh.clone(); let cache_domain = cache_refresh.clone();
let host_domain = host_refresh.clone(); let host_domain = host_refresh.clone();
let unix_sock_domain = unix_sock_refresh.clone(); let unix_sock_domain = unix_sock_refresh.clone();
let scope_domain = scope_refresh.clone();
let upstream_domain = upstream_refresh.clone(); let upstream_domain = upstream_refresh.clone();
join.spawn(async move { join.spawn(async move {
match crate::tls_front::fetcher::fetch_real_tls( match crate::tls_front::fetcher::fetch_real_tls(
@ -120,6 +127,7 @@ pub(crate) async fn bootstrap_tls_front(
&domain, &domain,
fetch_timeout, fetch_timeout,
Some(upstream_domain), Some(upstream_domain),
scope_domain.as_deref(),
proxy_protocol, proxy_protocol,
unix_sock_domain.as_deref(), unix_sock_domain.as_deref(),
) )

View File

@ -394,15 +394,17 @@ async fn connect_tcp_with_upstream(
port: u16, port: u16,
connect_timeout: Duration, connect_timeout: Duration,
upstream: Option<std::sync::Arc<crate::transport::UpstreamManager>>, upstream: Option<std::sync::Arc<crate::transport::UpstreamManager>>,
scope: Option<&str>,
) -> Result<TcpStream> { ) -> Result<TcpStream> {
if let Some(manager) = upstream { if let Some(manager) = upstream {
if let Some(addr) = resolve_socket_addr(host, port) { if let Some(addr) = resolve_socket_addr(host, port) {
match manager.connect(addr, None, None).await { match manager.connect(addr, None, scope).await {
Ok(stream) => return Ok(stream), Ok(stream) => return Ok(stream),
Err(e) => { Err(e) => {
warn!( warn!(
host = %host, host = %host,
port = port, port = port,
scope = ?scope,
error = %e, error = %e,
"Upstream connect failed, using direct connect" "Upstream connect failed, using direct connect"
); );
@ -410,12 +412,13 @@ async fn connect_tcp_with_upstream(
} }
} else if let Ok(mut addrs) = tokio::net::lookup_host((host, port)).await { } else if let Ok(mut addrs) = tokio::net::lookup_host((host, port)).await {
if let Some(addr) = addrs.find(|a| a.is_ipv4()) { if let Some(addr) = addrs.find(|a| a.is_ipv4()) {
match manager.connect(addr, None, None).await { match manager.connect(addr, None, scope).await {
Ok(stream) => return Ok(stream), Ok(stream) => return Ok(stream),
Err(e) => { Err(e) => {
warn!( warn!(
host = %host, host = %host,
port = port, port = port,
scope = ?scope,
error = %e, error = %e,
"Upstream connect failed, using direct connect" "Upstream connect failed, using direct connect"
); );
@ -537,6 +540,7 @@ async fn fetch_via_raw_tls(
sni: &str, sni: &str,
connect_timeout: Duration, connect_timeout: Duration,
upstream: Option<std::sync::Arc<crate::transport::UpstreamManager>>, upstream: Option<std::sync::Arc<crate::transport::UpstreamManager>>,
scope: Option<&str>,
proxy_protocol: u8, proxy_protocol: u8,
unix_sock: Option<&str>, unix_sock: Option<&str>,
) -> Result<TlsFetchResult> { ) -> Result<TlsFetchResult> {
@ -572,7 +576,7 @@ async fn fetch_via_raw_tls(
#[cfg(not(unix))] #[cfg(not(unix))]
let _ = unix_sock; let _ = unix_sock;
let stream = connect_tcp_with_upstream(host, port, connect_timeout, upstream).await?; let stream = connect_tcp_with_upstream(host, port, connect_timeout, upstream, scope).await?;
fetch_via_raw_tls_stream(stream, sni, connect_timeout, proxy_protocol).await fetch_via_raw_tls_stream(stream, sni, connect_timeout, proxy_protocol).await
} }
@ -675,6 +679,7 @@ async fn fetch_via_rustls(
sni: &str, sni: &str,
connect_timeout: Duration, connect_timeout: Duration,
upstream: Option<std::sync::Arc<crate::transport::UpstreamManager>>, upstream: Option<std::sync::Arc<crate::transport::UpstreamManager>>,
scope: Option<&str>,
proxy_protocol: u8, proxy_protocol: u8,
unix_sock: Option<&str>, unix_sock: Option<&str>,
) -> Result<TlsFetchResult> { ) -> Result<TlsFetchResult> {
@ -710,7 +715,7 @@ async fn fetch_via_rustls(
#[cfg(not(unix))] #[cfg(not(unix))]
let _ = unix_sock; let _ = unix_sock;
let stream = connect_tcp_with_upstream(host, port, connect_timeout, upstream).await?; let stream = connect_tcp_with_upstream(host, port, connect_timeout, upstream, scope).await?;
fetch_via_rustls_stream(stream, host, sni, proxy_protocol).await fetch_via_rustls_stream(stream, host, sni, proxy_protocol).await
} }
@ -726,6 +731,7 @@ pub async fn fetch_real_tls(
sni: &str, sni: &str,
connect_timeout: Duration, connect_timeout: Duration,
upstream: Option<std::sync::Arc<crate::transport::UpstreamManager>>, upstream: Option<std::sync::Arc<crate::transport::UpstreamManager>>,
scope: Option<&str>,
proxy_protocol: u8, proxy_protocol: u8,
unix_sock: Option<&str>, unix_sock: Option<&str>,
) -> Result<TlsFetchResult> { ) -> Result<TlsFetchResult> {
@ -735,6 +741,7 @@ pub async fn fetch_real_tls(
sni, sni,
connect_timeout, connect_timeout,
upstream.clone(), upstream.clone(),
scope,
proxy_protocol, proxy_protocol,
unix_sock, unix_sock,
) )
@ -753,6 +760,7 @@ pub async fn fetch_real_tls(
sni, sni,
connect_timeout, connect_timeout,
upstream, upstream,
scope,
proxy_protocol, proxy_protocol,
unix_sock, unix_sock,
) )

View File

@ -74,6 +74,64 @@ impl WriterContour {
} }
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub(crate) enum MeFamilyRuntimeState {
Healthy = 0,
Degraded = 1,
Suppressed = 2,
Recovering = 3,
}
impl MeFamilyRuntimeState {
pub(crate) fn from_u8(value: u8) -> Self {
match value {
1 => Self::Degraded,
2 => Self::Suppressed,
3 => Self::Recovering,
_ => Self::Healthy,
}
}
pub(crate) fn as_str(self) -> &'static str {
match self {
Self::Healthy => "healthy",
Self::Degraded => "degraded",
Self::Suppressed => "suppressed",
Self::Recovering => "recovering",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub(crate) enum MeDrainGateReason {
Open = 0,
CoverageQuorum = 1,
Redundancy = 2,
SuppressionActive = 3,
}
impl MeDrainGateReason {
pub(crate) fn from_u8(value: u8) -> Self {
match value {
1 => Self::CoverageQuorum,
2 => Self::Redundancy,
3 => Self::SuppressionActive,
_ => Self::Open,
}
}
pub(crate) fn as_str(self) -> &'static str {
match self {
Self::Open => "open",
Self::CoverageQuorum => "coverage_quorum",
Self::Redundancy => "redundancy",
Self::SuppressionActive => "suppression_active",
}
}
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct SecretSnapshot { pub struct SecretSnapshot {
pub epoch: u64, pub epoch: u64,
@ -202,6 +260,20 @@ pub struct MePool {
pub(super) me_health_interval_ms_unhealthy: AtomicU64, pub(super) me_health_interval_ms_unhealthy: AtomicU64,
pub(super) me_health_interval_ms_healthy: AtomicU64, pub(super) me_health_interval_ms_healthy: AtomicU64,
pub(super) me_warn_rate_limit_ms: AtomicU64, pub(super) me_warn_rate_limit_ms: AtomicU64,
pub(super) me_family_v4_runtime_state: AtomicU8,
pub(super) me_family_v6_runtime_state: AtomicU8,
pub(super) me_family_v4_state_since_epoch_secs: AtomicU64,
pub(super) me_family_v6_state_since_epoch_secs: AtomicU64,
pub(super) me_family_v4_suppressed_until_epoch_secs: AtomicU64,
pub(super) me_family_v6_suppressed_until_epoch_secs: AtomicU64,
pub(super) me_family_v4_fail_streak: AtomicU32,
pub(super) me_family_v6_fail_streak: AtomicU32,
pub(super) me_family_v4_recover_success_streak: AtomicU32,
pub(super) me_family_v6_recover_success_streak: AtomicU32,
pub(super) me_last_drain_gate_route_quorum_ok: AtomicBool,
pub(super) me_last_drain_gate_redundancy_ok: AtomicBool,
pub(super) me_last_drain_gate_block_reason: AtomicU8,
pub(super) me_last_drain_gate_updated_at_epoch_secs: AtomicU64,
pub(super) runtime_ready: AtomicBool, pub(super) runtime_ready: AtomicBool,
pool_size: usize, pool_size: usize,
pub(super) preferred_endpoints_by_dc: Arc<RwLock<HashMap<i32, Vec<SocketAddr>>>>, pub(super) preferred_endpoints_by_dc: Arc<RwLock<HashMap<i32, Vec<SocketAddr>>>>,
@ -512,6 +584,20 @@ impl MePool {
me_health_interval_ms_unhealthy: AtomicU64::new(me_health_interval_ms_unhealthy.max(1)), me_health_interval_ms_unhealthy: AtomicU64::new(me_health_interval_ms_unhealthy.max(1)),
me_health_interval_ms_healthy: AtomicU64::new(me_health_interval_ms_healthy.max(1)), me_health_interval_ms_healthy: AtomicU64::new(me_health_interval_ms_healthy.max(1)),
me_warn_rate_limit_ms: AtomicU64::new(me_warn_rate_limit_ms.max(1)), me_warn_rate_limit_ms: AtomicU64::new(me_warn_rate_limit_ms.max(1)),
me_family_v4_runtime_state: AtomicU8::new(MeFamilyRuntimeState::Healthy as u8),
me_family_v6_runtime_state: AtomicU8::new(MeFamilyRuntimeState::Healthy as u8),
me_family_v4_state_since_epoch_secs: AtomicU64::new(Self::now_epoch_secs()),
me_family_v6_state_since_epoch_secs: AtomicU64::new(Self::now_epoch_secs()),
me_family_v4_suppressed_until_epoch_secs: AtomicU64::new(0),
me_family_v6_suppressed_until_epoch_secs: AtomicU64::new(0),
me_family_v4_fail_streak: AtomicU32::new(0),
me_family_v6_fail_streak: AtomicU32::new(0),
me_family_v4_recover_success_streak: AtomicU32::new(0),
me_family_v6_recover_success_streak: AtomicU32::new(0),
me_last_drain_gate_route_quorum_ok: AtomicBool::new(false),
me_last_drain_gate_redundancy_ok: AtomicBool::new(false),
me_last_drain_gate_block_reason: AtomicU8::new(MeDrainGateReason::Open as u8),
me_last_drain_gate_updated_at_epoch_secs: AtomicU64::new(Self::now_epoch_secs()),
runtime_ready: AtomicBool::new(false), runtime_ready: AtomicBool::new(false),
preferred_endpoints_by_dc: Arc::new(RwLock::new(preferred_endpoints_by_dc)), preferred_endpoints_by_dc: Arc::new(RwLock::new(preferred_endpoints_by_dc)),
}) })
@ -529,6 +615,153 @@ impl MePool {
self.runtime_ready.load(Ordering::Relaxed) self.runtime_ready.load(Ordering::Relaxed)
} }
pub(super) fn set_family_runtime_state(
&self,
family: IpFamily,
state: MeFamilyRuntimeState,
state_since_epoch_secs: u64,
suppressed_until_epoch_secs: u64,
fail_streak: u32,
recover_success_streak: u32,
) {
match family {
IpFamily::V4 => {
self.me_family_v4_runtime_state
.store(state as u8, Ordering::Relaxed);
self.me_family_v4_state_since_epoch_secs
.store(state_since_epoch_secs, Ordering::Relaxed);
self.me_family_v4_suppressed_until_epoch_secs
.store(suppressed_until_epoch_secs, Ordering::Relaxed);
self.me_family_v4_fail_streak
.store(fail_streak, Ordering::Relaxed);
self.me_family_v4_recover_success_streak
.store(recover_success_streak, Ordering::Relaxed);
}
IpFamily::V6 => {
self.me_family_v6_runtime_state
.store(state as u8, Ordering::Relaxed);
self.me_family_v6_state_since_epoch_secs
.store(state_since_epoch_secs, Ordering::Relaxed);
self.me_family_v6_suppressed_until_epoch_secs
.store(suppressed_until_epoch_secs, Ordering::Relaxed);
self.me_family_v6_fail_streak
.store(fail_streak, Ordering::Relaxed);
self.me_family_v6_recover_success_streak
.store(recover_success_streak, Ordering::Relaxed);
}
}
}
pub(crate) fn family_runtime_state(&self, family: IpFamily) -> MeFamilyRuntimeState {
match family {
IpFamily::V4 => MeFamilyRuntimeState::from_u8(
self.me_family_v4_runtime_state.load(Ordering::Relaxed),
),
IpFamily::V6 => MeFamilyRuntimeState::from_u8(
self.me_family_v6_runtime_state.load(Ordering::Relaxed),
),
}
}
pub(crate) fn family_runtime_state_since_epoch_secs(&self, family: IpFamily) -> u64 {
match family {
IpFamily::V4 => self
.me_family_v4_state_since_epoch_secs
.load(Ordering::Relaxed),
IpFamily::V6 => self
.me_family_v6_state_since_epoch_secs
.load(Ordering::Relaxed),
}
}
pub(crate) fn family_suppressed_until_epoch_secs(&self, family: IpFamily) -> u64 {
match family {
IpFamily::V4 => self
.me_family_v4_suppressed_until_epoch_secs
.load(Ordering::Relaxed),
IpFamily::V6 => self
.me_family_v6_suppressed_until_epoch_secs
.load(Ordering::Relaxed),
}
}
pub(crate) fn family_fail_streak(&self, family: IpFamily) -> u32 {
match family {
IpFamily::V4 => self.me_family_v4_fail_streak.load(Ordering::Relaxed),
IpFamily::V6 => self.me_family_v6_fail_streak.load(Ordering::Relaxed),
}
}
pub(crate) fn family_recover_success_streak(&self, family: IpFamily) -> u32 {
match family {
IpFamily::V4 => self
.me_family_v4_recover_success_streak
.load(Ordering::Relaxed),
IpFamily::V6 => self
.me_family_v6_recover_success_streak
.load(Ordering::Relaxed),
}
}
pub(crate) fn is_family_temporarily_suppressed(
&self,
family: IpFamily,
now_epoch_secs: u64,
) -> bool {
self.family_suppressed_until_epoch_secs(family) > now_epoch_secs
}
pub(super) fn family_enabled_for_drain_coverage(
&self,
family: IpFamily,
now_epoch_secs: u64,
) -> bool {
let configured = match family {
IpFamily::V4 => self.decision.ipv4_me,
IpFamily::V6 => self.decision.ipv6_me,
};
configured && !self.is_family_temporarily_suppressed(family, now_epoch_secs)
}
pub(super) fn set_last_drain_gate(
&self,
route_quorum_ok: bool,
redundancy_ok: bool,
block_reason: MeDrainGateReason,
updated_at_epoch_secs: u64,
) {
self.me_last_drain_gate_route_quorum_ok
.store(route_quorum_ok, Ordering::Relaxed);
self.me_last_drain_gate_redundancy_ok
.store(redundancy_ok, Ordering::Relaxed);
self.me_last_drain_gate_block_reason
.store(block_reason as u8, Ordering::Relaxed);
self.me_last_drain_gate_updated_at_epoch_secs
.store(updated_at_epoch_secs, Ordering::Relaxed);
}
pub(crate) fn last_drain_gate_route_quorum_ok(&self) -> bool {
self.me_last_drain_gate_route_quorum_ok
.load(Ordering::Relaxed)
}
pub(crate) fn last_drain_gate_redundancy_ok(&self) -> bool {
self.me_last_drain_gate_redundancy_ok
.load(Ordering::Relaxed)
}
pub(crate) fn last_drain_gate_block_reason(&self) -> MeDrainGateReason {
MeDrainGateReason::from_u8(
self.me_last_drain_gate_block_reason
.load(Ordering::Relaxed),
)
}
pub(crate) fn last_drain_gate_updated_at_epoch_secs(&self) -> u64 {
self.me_last_drain_gate_updated_at_epoch_secs
.load(Ordering::Relaxed)
}
pub fn update_runtime_reinit_policy( pub fn update_runtime_reinit_policy(
&self, &self,
hardswap: bool, hardswap: bool,
@ -1042,9 +1275,10 @@ impl MePool {
} }
pub(super) async fn active_coverage_required_total(&self) -> usize { pub(super) async fn active_coverage_required_total(&self) -> usize {
let now_epoch_secs = Self::now_epoch_secs();
let mut endpoints_by_dc = HashMap::<i32, HashSet<SocketAddr>>::new(); let mut endpoints_by_dc = HashMap::<i32, HashSet<SocketAddr>>::new();
if self.decision.ipv4_me { if self.family_enabled_for_drain_coverage(IpFamily::V4, now_epoch_secs) {
let map = self.proxy_map_v4.read().await; let map = self.proxy_map_v4.read().await;
for (dc, addrs) in map.iter() { for (dc, addrs) in map.iter() {
let entry = endpoints_by_dc.entry(*dc).or_default(); let entry = endpoints_by_dc.entry(*dc).or_default();
@ -1054,7 +1288,7 @@ impl MePool {
} }
} }
if self.decision.ipv6_me { if self.family_enabled_for_drain_coverage(IpFamily::V6, now_epoch_secs) {
let map = self.proxy_map_v6.read().await; let map = self.proxy_map_v6.read().await;
for (dc, addrs) in map.iter() { for (dc, addrs) in map.iter() {
let entry = endpoints_by_dc.entry(*dc).or_default(); let entry = endpoints_by_dc.entry(*dc).or_default();

View File

@ -179,9 +179,10 @@ impl MePool {
} }
async fn endpoints_for_dc(&self, target_dc: i32) -> Vec<SocketAddr> { async fn endpoints_for_dc(&self, target_dc: i32) -> Vec<SocketAddr> {
let now_epoch_secs = Self::now_epoch_secs();
let mut endpoints = HashSet::<SocketAddr>::new(); let mut endpoints = HashSet::<SocketAddr>::new();
if self.decision.ipv4_me { if self.family_enabled_for_drain_coverage(IpFamily::V4, now_epoch_secs) {
let map = self.proxy_map_v4.read().await; let map = self.proxy_map_v4.read().await;
if let Some(addrs) = map.get(&target_dc) { if let Some(addrs) = map.get(&target_dc) {
for (ip, port) in addrs { for (ip, port) in addrs {
@ -190,7 +191,7 @@ impl MePool {
} }
} }
if self.decision.ipv6_me { if self.family_enabled_for_drain_coverage(IpFamily::V6, now_epoch_secs) {
let map = self.proxy_map_v6.read().await; let map = self.proxy_map_v6.read().await;
if let Some(addrs) = map.get(&target_dc) { if let Some(addrs) = map.get(&target_dc) {
for (ip, port) in addrs { for (ip, port) in addrs {

View File

@ -11,8 +11,9 @@ use tracing::{debug, info, warn};
use std::collections::hash_map::DefaultHasher; use std::collections::hash_map::DefaultHasher;
use crate::crypto::SecureRandom; use crate::crypto::SecureRandom;
use crate::network::IpFamily;
use super::pool::{MePool, WriterContour}; use super::pool::{MeDrainGateReason, MePool, WriterContour};
const ME_HARDSWAP_PENDING_TTL_SECS: u64 = 1800; const ME_HARDSWAP_PENDING_TTL_SECS: u64 = 1800;
@ -120,9 +121,10 @@ impl MePool {
} }
async fn desired_dc_endpoints(&self) -> HashMap<i32, HashSet<SocketAddr>> { async fn desired_dc_endpoints(&self) -> HashMap<i32, HashSet<SocketAddr>> {
let now_epoch_secs = Self::now_epoch_secs();
let mut out: HashMap<i32, HashSet<SocketAddr>> = HashMap::new(); let mut out: HashMap<i32, HashSet<SocketAddr>> = HashMap::new();
if self.decision.ipv4_me { if self.family_enabled_for_drain_coverage(IpFamily::V4, now_epoch_secs) {
let map_v4 = self.proxy_map_v4.read().await.clone(); let map_v4 = self.proxy_map_v4.read().await.clone();
for (dc, addrs) in map_v4 { for (dc, addrs) in map_v4 {
let entry = out.entry(dc).or_default(); let entry = out.entry(dc).or_default();
@ -132,7 +134,7 @@ impl MePool {
} }
} }
if self.decision.ipv6_me { if self.family_enabled_for_drain_coverage(IpFamily::V6, now_epoch_secs) {
let map_v6 = self.proxy_map_v6.read().await.clone(); let map_v6 = self.proxy_map_v6.read().await.clone();
for (dc, addrs) in map_v6 { for (dc, addrs) in map_v6 {
let entry = out.entry(dc).or_default(); let entry = out.entry(dc).or_default();
@ -345,13 +347,23 @@ impl MePool {
pub async fn zero_downtime_reinit_after_map_change(self: &Arc<Self>, rng: &SecureRandom) { pub async fn zero_downtime_reinit_after_map_change(self: &Arc<Self>, rng: &SecureRandom) {
let desired_by_dc = self.desired_dc_endpoints().await; let desired_by_dc = self.desired_dc_endpoints().await;
let now_epoch_secs = Self::now_epoch_secs();
let v4_suppressed = self.is_family_temporarily_suppressed(IpFamily::V4, now_epoch_secs);
let v6_suppressed = self.is_family_temporarily_suppressed(IpFamily::V6, now_epoch_secs);
if desired_by_dc.is_empty() { if desired_by_dc.is_empty() {
warn!("ME endpoint map is empty; skipping stale writer drain"); warn!("ME endpoint map is empty; skipping stale writer drain");
let reason = if (self.decision.ipv4_me && v4_suppressed)
|| (self.decision.ipv6_me && v6_suppressed)
{
MeDrainGateReason::SuppressionActive
} else {
MeDrainGateReason::CoverageQuorum
};
self.set_last_drain_gate(false, false, reason, now_epoch_secs);
return; return;
} }
let desired_map_hash = Self::desired_map_hash(&desired_by_dc); let desired_map_hash = Self::desired_map_hash(&desired_by_dc);
let now_epoch_secs = Self::now_epoch_secs();
let previous_generation = self.current_generation(); let previous_generation = self.current_generation();
let hardswap = self.hardswap.load(Ordering::Relaxed); let hardswap = self.hardswap.load(Ordering::Relaxed);
let generation = if hardswap { let generation = if hardswap {
@ -422,7 +434,17 @@ impl MePool {
.load(Ordering::Relaxed), .load(Ordering::Relaxed),
); );
let (coverage_ratio, missing_dc) = Self::coverage_ratio(&desired_by_dc, &active_writer_addrs); let (coverage_ratio, missing_dc) = Self::coverage_ratio(&desired_by_dc, &active_writer_addrs);
let mut route_quorum_ok = coverage_ratio >= min_ratio;
let mut redundancy_ok = missing_dc.is_empty();
let mut redundancy_missing_dc = missing_dc.clone();
let mut gate_coverage_ratio = coverage_ratio;
if !hardswap && coverage_ratio < min_ratio { if !hardswap && coverage_ratio < min_ratio {
self.set_last_drain_gate(
false,
redundancy_ok,
MeDrainGateReason::CoverageQuorum,
now_epoch_secs,
);
warn!( warn!(
previous_generation, previous_generation,
generation, generation,
@ -443,7 +465,17 @@ impl MePool {
.collect(); .collect();
let (fresh_coverage_ratio, fresh_missing_dc) = let (fresh_coverage_ratio, fresh_missing_dc) =
Self::coverage_ratio(&desired_by_dc, &fresh_writer_addrs); Self::coverage_ratio(&desired_by_dc, &fresh_writer_addrs);
if !fresh_missing_dc.is_empty() { route_quorum_ok = fresh_coverage_ratio >= min_ratio;
redundancy_ok = fresh_missing_dc.is_empty();
redundancy_missing_dc = fresh_missing_dc.clone();
gate_coverage_ratio = fresh_coverage_ratio;
if fresh_coverage_ratio < min_ratio {
self.set_last_drain_gate(
false,
redundancy_ok,
MeDrainGateReason::CoverageQuorum,
now_epoch_secs,
);
warn!( warn!(
previous_generation, previous_generation,
generation, generation,
@ -453,13 +485,16 @@ impl MePool {
); );
return; return;
} }
} else if !missing_dc.is_empty() { }
self.set_last_drain_gate(route_quorum_ok, redundancy_ok, MeDrainGateReason::Open, now_epoch_secs);
if !redundancy_ok {
warn!( warn!(
missing_dc = ?missing_dc, missing_dc = ?redundancy_missing_dc,
// Keep stale writers alive when fresh coverage is incomplete. coverage_ratio = format_args!("{gate_coverage_ratio:.3}"),
"ME reinit coverage incomplete; keeping stale writers" min_ratio = format_args!("{min_ratio:.3}"),
"ME reinit proceeds with weighted quorum while some DC groups remain uncovered"
); );
return;
} }
if hardswap { if hardswap {

View File

@ -1,7 +1,7 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::time::Instant; use std::time::Instant;
use super::pool::{MePool, RefillDcKey}; use super::pool::{MeDrainGateReason, MePool, RefillDcKey};
use crate::network::IpFamily; use crate::network::IpFamily;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -36,6 +36,24 @@ pub(crate) struct MeApiNatStunSnapshot {
pub stun_backoff_remaining_ms: Option<u64>, pub stun_backoff_remaining_ms: Option<u64>,
} }
#[derive(Clone, Debug)]
pub(crate) struct MeApiFamilyStateSnapshot {
pub family: &'static str,
pub state: &'static str,
pub state_since_epoch_secs: u64,
pub suppressed_until_epoch_secs: Option<u64>,
pub fail_streak: u32,
pub recover_success_streak: u32,
}
#[derive(Clone, Debug)]
pub(crate) struct MeApiDrainGateSnapshot {
pub route_quorum_ok: bool,
pub redundancy_ok: bool,
pub block_reason: &'static str,
pub updated_at_epoch_secs: u64,
}
impl MePool { impl MePool {
pub(crate) async fn api_refill_snapshot(&self) -> MeApiRefillSnapshot { pub(crate) async fn api_refill_snapshot(&self) -> MeApiRefillSnapshot {
let inflight_endpoints_total = self.refill_inflight.lock().await.len(); let inflight_endpoints_total = self.refill_inflight.lock().await.len();
@ -125,4 +143,35 @@ impl MePool {
stun_backoff_remaining_ms, stun_backoff_remaining_ms,
} }
} }
pub(crate) fn api_family_state_snapshot(&self) -> Vec<MeApiFamilyStateSnapshot> {
[IpFamily::V4, IpFamily::V6]
.into_iter()
.map(|family| {
let state = self.family_runtime_state(family);
let suppressed_until = self.family_suppressed_until_epoch_secs(family);
MeApiFamilyStateSnapshot {
family: match family {
IpFamily::V4 => "v4",
IpFamily::V6 => "v6",
},
state: state.as_str(),
state_since_epoch_secs: self.family_runtime_state_since_epoch_secs(family),
suppressed_until_epoch_secs: (suppressed_until != 0).then_some(suppressed_until),
fail_streak: self.family_fail_streak(family),
recover_success_streak: self.family_recover_success_streak(family),
}
})
.collect()
}
pub(crate) fn api_drain_gate_snapshot(&self) -> MeApiDrainGateSnapshot {
let reason: MeDrainGateReason = self.last_drain_gate_block_reason();
MeApiDrainGateSnapshot {
route_quorum_ok: self.last_drain_gate_route_quorum_ok(),
redundancy_ok: self.last_drain_gate_redundancy_ok(),
block_reason: reason.as_str(),
updated_at_epoch_secs: self.last_drain_gate_updated_at_epoch_secs(),
}
}
} }