ME Draining on Dual-Stack

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey
2026-03-20 16:07:12 +03:00
parent 5c0eb6dbe8
commit 269ba537ad
6 changed files with 664 additions and 37 deletions

View File

@@ -74,6 +74,64 @@ impl WriterContour {
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub(crate) enum MeFamilyRuntimeState {
Healthy = 0,
Degraded = 1,
Suppressed = 2,
Recovering = 3,
}
impl MeFamilyRuntimeState {
pub(crate) fn from_u8(value: u8) -> Self {
match value {
1 => Self::Degraded,
2 => Self::Suppressed,
3 => Self::Recovering,
_ => Self::Healthy,
}
}
pub(crate) fn as_str(self) -> &'static str {
match self {
Self::Healthy => "healthy",
Self::Degraded => "degraded",
Self::Suppressed => "suppressed",
Self::Recovering => "recovering",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub(crate) enum MeDrainGateReason {
Open = 0,
CoverageQuorum = 1,
Redundancy = 2,
SuppressionActive = 3,
}
impl MeDrainGateReason {
pub(crate) fn from_u8(value: u8) -> Self {
match value {
1 => Self::CoverageQuorum,
2 => Self::Redundancy,
3 => Self::SuppressionActive,
_ => Self::Open,
}
}
pub(crate) fn as_str(self) -> &'static str {
match self {
Self::Open => "open",
Self::CoverageQuorum => "coverage_quorum",
Self::Redundancy => "redundancy",
Self::SuppressionActive => "suppression_active",
}
}
}
#[derive(Debug, Clone)]
pub struct SecretSnapshot {
pub epoch: u64,
@@ -203,6 +261,20 @@ pub struct MePool {
pub(super) me_health_interval_ms_unhealthy: AtomicU64,
pub(super) me_health_interval_ms_healthy: AtomicU64,
pub(super) me_warn_rate_limit_ms: AtomicU64,
pub(super) me_family_v4_runtime_state: AtomicU8,
pub(super) me_family_v6_runtime_state: AtomicU8,
pub(super) me_family_v4_state_since_epoch_secs: AtomicU64,
pub(super) me_family_v6_state_since_epoch_secs: AtomicU64,
pub(super) me_family_v4_suppressed_until_epoch_secs: AtomicU64,
pub(super) me_family_v6_suppressed_until_epoch_secs: AtomicU64,
pub(super) me_family_v4_fail_streak: AtomicU32,
pub(super) me_family_v6_fail_streak: AtomicU32,
pub(super) me_family_v4_recover_success_streak: AtomicU32,
pub(super) me_family_v6_recover_success_streak: AtomicU32,
pub(super) me_last_drain_gate_route_quorum_ok: AtomicBool,
pub(super) me_last_drain_gate_redundancy_ok: AtomicBool,
pub(super) me_last_drain_gate_block_reason: AtomicU8,
pub(super) me_last_drain_gate_updated_at_epoch_secs: AtomicU64,
pub(super) runtime_ready: AtomicBool,
pool_size: usize,
pub(super) preferred_endpoints_by_dc: Arc<RwLock<HashMap<i32, Vec<SocketAddr>>>>,
@@ -518,6 +590,20 @@ impl MePool {
me_health_interval_ms_unhealthy: AtomicU64::new(me_health_interval_ms_unhealthy.max(1)),
me_health_interval_ms_healthy: AtomicU64::new(me_health_interval_ms_healthy.max(1)),
me_warn_rate_limit_ms: AtomicU64::new(me_warn_rate_limit_ms.max(1)),
me_family_v4_runtime_state: AtomicU8::new(MeFamilyRuntimeState::Healthy as u8),
me_family_v6_runtime_state: AtomicU8::new(MeFamilyRuntimeState::Healthy as u8),
me_family_v4_state_since_epoch_secs: AtomicU64::new(Self::now_epoch_secs()),
me_family_v6_state_since_epoch_secs: AtomicU64::new(Self::now_epoch_secs()),
me_family_v4_suppressed_until_epoch_secs: AtomicU64::new(0),
me_family_v6_suppressed_until_epoch_secs: AtomicU64::new(0),
me_family_v4_fail_streak: AtomicU32::new(0),
me_family_v6_fail_streak: AtomicU32::new(0),
me_family_v4_recover_success_streak: AtomicU32::new(0),
me_family_v6_recover_success_streak: AtomicU32::new(0),
me_last_drain_gate_route_quorum_ok: AtomicBool::new(false),
me_last_drain_gate_redundancy_ok: AtomicBool::new(false),
me_last_drain_gate_block_reason: AtomicU8::new(MeDrainGateReason::Open as u8),
me_last_drain_gate_updated_at_epoch_secs: AtomicU64::new(Self::now_epoch_secs()),
runtime_ready: AtomicBool::new(false),
preferred_endpoints_by_dc: Arc::new(RwLock::new(preferred_endpoints_by_dc)),
})
@@ -535,6 +621,153 @@ impl MePool {
self.runtime_ready.load(Ordering::Relaxed)
}
pub(super) fn set_family_runtime_state(
&self,
family: IpFamily,
state: MeFamilyRuntimeState,
state_since_epoch_secs: u64,
suppressed_until_epoch_secs: u64,
fail_streak: u32,
recover_success_streak: u32,
) {
match family {
IpFamily::V4 => {
self.me_family_v4_runtime_state
.store(state as u8, Ordering::Relaxed);
self.me_family_v4_state_since_epoch_secs
.store(state_since_epoch_secs, Ordering::Relaxed);
self.me_family_v4_suppressed_until_epoch_secs
.store(suppressed_until_epoch_secs, Ordering::Relaxed);
self.me_family_v4_fail_streak
.store(fail_streak, Ordering::Relaxed);
self.me_family_v4_recover_success_streak
.store(recover_success_streak, Ordering::Relaxed);
}
IpFamily::V6 => {
self.me_family_v6_runtime_state
.store(state as u8, Ordering::Relaxed);
self.me_family_v6_state_since_epoch_secs
.store(state_since_epoch_secs, Ordering::Relaxed);
self.me_family_v6_suppressed_until_epoch_secs
.store(suppressed_until_epoch_secs, Ordering::Relaxed);
self.me_family_v6_fail_streak
.store(fail_streak, Ordering::Relaxed);
self.me_family_v6_recover_success_streak
.store(recover_success_streak, Ordering::Relaxed);
}
}
}
pub(crate) fn family_runtime_state(&self, family: IpFamily) -> MeFamilyRuntimeState {
match family {
IpFamily::V4 => MeFamilyRuntimeState::from_u8(
self.me_family_v4_runtime_state.load(Ordering::Relaxed),
),
IpFamily::V6 => MeFamilyRuntimeState::from_u8(
self.me_family_v6_runtime_state.load(Ordering::Relaxed),
),
}
}
pub(crate) fn family_runtime_state_since_epoch_secs(&self, family: IpFamily) -> u64 {
match family {
IpFamily::V4 => self
.me_family_v4_state_since_epoch_secs
.load(Ordering::Relaxed),
IpFamily::V6 => self
.me_family_v6_state_since_epoch_secs
.load(Ordering::Relaxed),
}
}
pub(crate) fn family_suppressed_until_epoch_secs(&self, family: IpFamily) -> u64 {
match family {
IpFamily::V4 => self
.me_family_v4_suppressed_until_epoch_secs
.load(Ordering::Relaxed),
IpFamily::V6 => self
.me_family_v6_suppressed_until_epoch_secs
.load(Ordering::Relaxed),
}
}
pub(crate) fn family_fail_streak(&self, family: IpFamily) -> u32 {
match family {
IpFamily::V4 => self.me_family_v4_fail_streak.load(Ordering::Relaxed),
IpFamily::V6 => self.me_family_v6_fail_streak.load(Ordering::Relaxed),
}
}
pub(crate) fn family_recover_success_streak(&self, family: IpFamily) -> u32 {
match family {
IpFamily::V4 => self
.me_family_v4_recover_success_streak
.load(Ordering::Relaxed),
IpFamily::V6 => self
.me_family_v6_recover_success_streak
.load(Ordering::Relaxed),
}
}
pub(crate) fn is_family_temporarily_suppressed(
&self,
family: IpFamily,
now_epoch_secs: u64,
) -> bool {
self.family_suppressed_until_epoch_secs(family) > now_epoch_secs
}
pub(super) fn family_enabled_for_drain_coverage(
&self,
family: IpFamily,
now_epoch_secs: u64,
) -> bool {
let configured = match family {
IpFamily::V4 => self.decision.ipv4_me,
IpFamily::V6 => self.decision.ipv6_me,
};
configured && !self.is_family_temporarily_suppressed(family, now_epoch_secs)
}
pub(super) fn set_last_drain_gate(
&self,
route_quorum_ok: bool,
redundancy_ok: bool,
block_reason: MeDrainGateReason,
updated_at_epoch_secs: u64,
) {
self.me_last_drain_gate_route_quorum_ok
.store(route_quorum_ok, Ordering::Relaxed);
self.me_last_drain_gate_redundancy_ok
.store(redundancy_ok, Ordering::Relaxed);
self.me_last_drain_gate_block_reason
.store(block_reason as u8, Ordering::Relaxed);
self.me_last_drain_gate_updated_at_epoch_secs
.store(updated_at_epoch_secs, Ordering::Relaxed);
}
pub(crate) fn last_drain_gate_route_quorum_ok(&self) -> bool {
self.me_last_drain_gate_route_quorum_ok
.load(Ordering::Relaxed)
}
pub(crate) fn last_drain_gate_redundancy_ok(&self) -> bool {
self.me_last_drain_gate_redundancy_ok
.load(Ordering::Relaxed)
}
pub(crate) fn last_drain_gate_block_reason(&self) -> MeDrainGateReason {
MeDrainGateReason::from_u8(
self.me_last_drain_gate_block_reason
.load(Ordering::Relaxed),
)
}
pub(crate) fn last_drain_gate_updated_at_epoch_secs(&self) -> u64 {
self.me_last_drain_gate_updated_at_epoch_secs
.load(Ordering::Relaxed)
}
pub fn update_runtime_reinit_policy(
&self,
hardswap: bool,
@@ -1021,9 +1254,10 @@ impl MePool {
}
pub(super) async fn active_coverage_required_total(&self) -> usize {
let now_epoch_secs = Self::now_epoch_secs();
let mut endpoints_by_dc = HashMap::<i32, HashSet<SocketAddr>>::new();
if self.decision.ipv4_me {
if self.family_enabled_for_drain_coverage(IpFamily::V4, now_epoch_secs) {
let map = self.proxy_map_v4.read().await;
for (dc, addrs) in map.iter() {
let entry = endpoints_by_dc.entry(*dc).or_default();
@@ -1033,7 +1267,7 @@ impl MePool {
}
}
if self.decision.ipv6_me {
if self.family_enabled_for_drain_coverage(IpFamily::V6, now_epoch_secs) {
let map = self.proxy_map_v6.read().await;
for (dc, addrs) in map.iter() {
let entry = endpoints_by_dc.entry(*dc).or_default();