mirror of https://github.com/telemt/telemt.git
ME Reinit Core
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
parent
0b78583cf5
commit
1c3e0d4e46
|
|
@ -265,9 +265,20 @@ pub struct RoutingCore {
|
||||||
pub(super) preferred_endpoints_by_dc: ArcSwap<HashMap<i32, Vec<SocketAddr>>>,
|
pub(super) preferred_endpoints_by_dc: ArcSwap<HashMap<i32, Vec<SocketAddr>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(super) struct ReinitCore {
|
||||||
|
pub(super) generation: AtomicU64,
|
||||||
|
pub(super) active_generation: AtomicU64,
|
||||||
|
pub(super) warm_generation: AtomicU64,
|
||||||
|
pub(super) pending_hardswap_generation: AtomicU64,
|
||||||
|
pub(super) pending_hardswap_started_at_epoch_secs: AtomicU64,
|
||||||
|
pub(super) pending_hardswap_map_hash: AtomicU64,
|
||||||
|
pub(super) hardswap: AtomicBool,
|
||||||
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub struct MePool {
|
pub struct MePool {
|
||||||
pub(super) routing: Arc<RoutingCore>,
|
pub(super) routing: Arc<RoutingCore>,
|
||||||
|
pub(super) reinit: Arc<ReinitCore>,
|
||||||
pub(super) decision: NetworkDecision,
|
pub(super) decision: NetworkDecision,
|
||||||
pub(super) upstream: Option<Arc<UpstreamManager>>,
|
pub(super) upstream: Option<Arc<UpstreamManager>>,
|
||||||
pub(super) rng: Arc<SecureRandom>,
|
pub(super) rng: Arc<SecureRandom>,
|
||||||
|
|
@ -343,13 +354,6 @@ pub struct MePool {
|
||||||
pub(super) conn_count: AtomicUsize,
|
pub(super) conn_count: AtomicUsize,
|
||||||
pub(super) draining_active_runtime: AtomicU64,
|
pub(super) draining_active_runtime: AtomicU64,
|
||||||
pub(super) stats: Arc<crate::stats::Stats>,
|
pub(super) stats: Arc<crate::stats::Stats>,
|
||||||
pub(super) generation: AtomicU64,
|
|
||||||
pub(super) active_generation: AtomicU64,
|
|
||||||
pub(super) warm_generation: AtomicU64,
|
|
||||||
pub(super) pending_hardswap_generation: AtomicU64,
|
|
||||||
pub(super) pending_hardswap_started_at_epoch_secs: AtomicU64,
|
|
||||||
pub(super) pending_hardswap_map_hash: AtomicU64,
|
|
||||||
pub(super) hardswap: AtomicBool,
|
|
||||||
pub(super) endpoint_quarantine: Arc<Mutex<HashMap<SocketAddr, Instant>>>,
|
pub(super) endpoint_quarantine: Arc<Mutex<HashMap<SocketAddr, Instant>>>,
|
||||||
pub(super) kdf_material_fingerprint: Arc<RwLock<HashMap<SocketAddr, (u64, u16)>>>,
|
pub(super) kdf_material_fingerprint: Arc<RwLock<HashMap<SocketAddr, (u64, u16)>>>,
|
||||||
pub(super) me_pool_drain_ttl_secs: AtomicU64,
|
pub(super) me_pool_drain_ttl_secs: AtomicU64,
|
||||||
|
|
@ -543,6 +547,15 @@ impl MePool {
|
||||||
writer_epoch,
|
writer_epoch,
|
||||||
preferred_endpoints_by_dc: ArcSwap::from_pointee(preferred_endpoints_by_dc),
|
preferred_endpoints_by_dc: ArcSwap::from_pointee(preferred_endpoints_by_dc),
|
||||||
}),
|
}),
|
||||||
|
reinit: Arc::new(ReinitCore {
|
||||||
|
generation: AtomicU64::new(1),
|
||||||
|
active_generation: AtomicU64::new(1),
|
||||||
|
warm_generation: AtomicU64::new(0),
|
||||||
|
pending_hardswap_generation: AtomicU64::new(0),
|
||||||
|
pending_hardswap_started_at_epoch_secs: AtomicU64::new(0),
|
||||||
|
pending_hardswap_map_hash: AtomicU64::new(0),
|
||||||
|
hardswap: AtomicBool::new(hardswap),
|
||||||
|
}),
|
||||||
decision,
|
decision,
|
||||||
upstream,
|
upstream,
|
||||||
rng,
|
rng,
|
||||||
|
|
@ -664,13 +677,6 @@ impl MePool {
|
||||||
refill_inflight_dc: Arc::new(Mutex::new(HashSet::new())),
|
refill_inflight_dc: Arc::new(Mutex::new(HashSet::new())),
|
||||||
conn_count: AtomicUsize::new(0),
|
conn_count: AtomicUsize::new(0),
|
||||||
draining_active_runtime: AtomicU64::new(0),
|
draining_active_runtime: AtomicU64::new(0),
|
||||||
generation: AtomicU64::new(1),
|
|
||||||
active_generation: AtomicU64::new(1),
|
|
||||||
warm_generation: AtomicU64::new(0),
|
|
||||||
pending_hardswap_generation: AtomicU64::new(0),
|
|
||||||
pending_hardswap_started_at_epoch_secs: AtomicU64::new(0),
|
|
||||||
pending_hardswap_map_hash: AtomicU64::new(0),
|
|
||||||
hardswap: AtomicBool::new(hardswap),
|
|
||||||
endpoint_quarantine: Arc::new(Mutex::new(HashMap::new())),
|
endpoint_quarantine: Arc::new(Mutex::new(HashMap::new())),
|
||||||
kdf_material_fingerprint: Arc::new(RwLock::new(HashMap::new())),
|
kdf_material_fingerprint: Arc::new(RwLock::new(HashMap::new())),
|
||||||
me_pool_drain_ttl_secs: AtomicU64::new(me_pool_drain_ttl_secs),
|
me_pool_drain_ttl_secs: AtomicU64::new(me_pool_drain_ttl_secs),
|
||||||
|
|
@ -750,7 +756,7 @@ impl MePool {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn current_generation(&self) -> u64 {
|
pub fn current_generation(&self) -> u64 {
|
||||||
self.active_generation.load(Ordering::Relaxed)
|
self.reinit.active_generation.load(Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn set_runtime_ready(&self, ready: bool) {
|
pub fn set_runtime_ready(&self, ready: bool) {
|
||||||
|
|
@ -934,7 +940,7 @@ impl MePool {
|
||||||
me_health_interval_ms_healthy: u64,
|
me_health_interval_ms_healthy: u64,
|
||||||
me_warn_rate_limit_ms: u64,
|
me_warn_rate_limit_ms: u64,
|
||||||
) {
|
) {
|
||||||
self.hardswap.store(hardswap, Ordering::Relaxed);
|
self.reinit.hardswap.store(hardswap, Ordering::Relaxed);
|
||||||
self.me_pool_drain_ttl_secs
|
self.me_pool_drain_ttl_secs
|
||||||
.store(drain_ttl_secs, Ordering::Relaxed);
|
.store(drain_ttl_secs, Ordering::Relaxed);
|
||||||
self.me_instadrain.store(instadrain, Ordering::Relaxed);
|
self.me_instadrain.store(instadrain, Ordering::Relaxed);
|
||||||
|
|
|
||||||
|
|
@ -37,16 +37,21 @@ impl MePool {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn clear_pending_hardswap_state(&self) {
|
fn clear_pending_hardswap_state(&self) {
|
||||||
self.pending_hardswap_generation.store(0, Ordering::Relaxed);
|
self.reinit.pending_hardswap_generation.store(0, Ordering::Relaxed);
|
||||||
self.pending_hardswap_started_at_epoch_secs
|
self.reinit
|
||||||
|
.pending_hardswap_started_at_epoch_secs
|
||||||
.store(0, Ordering::Relaxed);
|
.store(0, Ordering::Relaxed);
|
||||||
self.pending_hardswap_map_hash.store(0, Ordering::Relaxed);
|
self.reinit
|
||||||
self.warm_generation.store(0, Ordering::Relaxed);
|
.pending_hardswap_map_hash
|
||||||
|
.store(0, Ordering::Relaxed);
|
||||||
|
self.reinit.warm_generation.store(0, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn promote_warm_generation_to_active(&self, generation: u64) {
|
async fn promote_warm_generation_to_active(&self, generation: u64) {
|
||||||
self.active_generation.store(generation, Ordering::Relaxed);
|
self.reinit
|
||||||
self.warm_generation.store(0, Ordering::Relaxed);
|
.active_generation
|
||||||
|
.store(generation, Ordering::Relaxed);
|
||||||
|
self.reinit.warm_generation.store(0, Ordering::Relaxed);
|
||||||
|
|
||||||
let ws = self.writers.read().await;
|
let ws = self.writers.read().await;
|
||||||
for writer in ws.iter() {
|
for writer in ws.iter() {
|
||||||
|
|
@ -369,13 +374,17 @@ impl MePool {
|
||||||
|
|
||||||
let desired_map_hash = Self::desired_map_hash(&desired_by_dc);
|
let desired_map_hash = Self::desired_map_hash(&desired_by_dc);
|
||||||
let previous_generation = self.current_generation();
|
let previous_generation = self.current_generation();
|
||||||
let hardswap = self.hardswap.load(Ordering::Relaxed);
|
let hardswap = self.reinit.hardswap.load(Ordering::Relaxed);
|
||||||
let generation = if hardswap {
|
let generation = if hardswap {
|
||||||
let pending_generation = self.pending_hardswap_generation.load(Ordering::Relaxed);
|
let pending_generation = self
|
||||||
|
.reinit
|
||||||
|
.pending_hardswap_generation
|
||||||
|
.load(Ordering::Relaxed);
|
||||||
let pending_started_at = self
|
let pending_started_at = self
|
||||||
|
.reinit
|
||||||
.pending_hardswap_started_at_epoch_secs
|
.pending_hardswap_started_at_epoch_secs
|
||||||
.load(Ordering::Relaxed);
|
.load(Ordering::Relaxed);
|
||||||
let pending_map_hash = self.pending_hardswap_map_hash.load(Ordering::Relaxed);
|
let pending_map_hash = self.reinit.pending_hardswap_map_hash.load(Ordering::Relaxed);
|
||||||
let pending_age_secs = now_epoch_secs.saturating_sub(pending_started_at);
|
let pending_age_secs = now_epoch_secs.saturating_sub(pending_started_at);
|
||||||
let pending_ttl_expired =
|
let pending_ttl_expired =
|
||||||
pending_started_at > 0 && pending_age_secs > ME_HARDSWAP_PENDING_TTL_SECS;
|
pending_started_at > 0 && pending_age_secs > ME_HARDSWAP_PENDING_TTL_SECS;
|
||||||
|
|
@ -405,24 +414,28 @@ impl MePool {
|
||||||
"ME hardswap pending generation expired by TTL; starting fresh generation"
|
"ME hardswap pending generation expired by TTL; starting fresh generation"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
let next_generation = self.generation.fetch_add(1, Ordering::Relaxed) + 1;
|
let next_generation = self.reinit.generation.fetch_add(1, Ordering::Relaxed) + 1;
|
||||||
self.pending_hardswap_generation
|
self.reinit
|
||||||
|
.pending_hardswap_generation
|
||||||
.store(next_generation, Ordering::Relaxed);
|
.store(next_generation, Ordering::Relaxed);
|
||||||
self.pending_hardswap_started_at_epoch_secs
|
self.reinit
|
||||||
|
.pending_hardswap_started_at_epoch_secs
|
||||||
.store(now_epoch_secs, Ordering::Relaxed);
|
.store(now_epoch_secs, Ordering::Relaxed);
|
||||||
self.pending_hardswap_map_hash
|
self.reinit
|
||||||
|
.pending_hardswap_map_hash
|
||||||
.store(desired_map_hash, Ordering::Relaxed);
|
.store(desired_map_hash, Ordering::Relaxed);
|
||||||
self.warm_generation
|
self.reinit
|
||||||
|
.warm_generation
|
||||||
.store(next_generation, Ordering::Relaxed);
|
.store(next_generation, Ordering::Relaxed);
|
||||||
next_generation
|
next_generation
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
self.clear_pending_hardswap_state();
|
self.clear_pending_hardswap_state();
|
||||||
self.generation.fetch_add(1, Ordering::Relaxed) + 1
|
self.reinit.generation.fetch_add(1, Ordering::Relaxed) + 1
|
||||||
};
|
};
|
||||||
|
|
||||||
if hardswap {
|
if hardswap {
|
||||||
self.warm_generation.store(generation, Ordering::Relaxed);
|
self.reinit.warm_generation.store(generation, Ordering::Relaxed);
|
||||||
self.warmup_generation_for_all_dcs(rng, generation, &desired_by_dc)
|
self.warmup_generation_for_all_dcs(rng, generation, &desired_by_dc)
|
||||||
.await;
|
.await;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
||||||
|
|
@ -436,6 +436,7 @@ impl MePool {
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
let now_epoch_secs = Self::now_epoch_secs();
|
let now_epoch_secs = Self::now_epoch_secs();
|
||||||
let pending_started_at = self
|
let pending_started_at = self
|
||||||
|
.reinit
|
||||||
.pending_hardswap_started_at_epoch_secs
|
.pending_hardswap_started_at_epoch_secs
|
||||||
.load(Ordering::Relaxed);
|
.load(Ordering::Relaxed);
|
||||||
let pending_hardswap_age_secs =
|
let pending_hardswap_age_secs =
|
||||||
|
|
@ -477,11 +478,14 @@ impl MePool {
|
||||||
}
|
}
|
||||||
|
|
||||||
MeApiRuntimeSnapshot {
|
MeApiRuntimeSnapshot {
|
||||||
active_generation: self.active_generation.load(Ordering::Relaxed),
|
active_generation: self.reinit.active_generation.load(Ordering::Relaxed),
|
||||||
warm_generation: self.warm_generation.load(Ordering::Relaxed),
|
warm_generation: self.reinit.warm_generation.load(Ordering::Relaxed),
|
||||||
pending_hardswap_generation: self.pending_hardswap_generation.load(Ordering::Relaxed),
|
pending_hardswap_generation: self
|
||||||
|
.reinit
|
||||||
|
.pending_hardswap_generation
|
||||||
|
.load(Ordering::Relaxed),
|
||||||
pending_hardswap_age_secs,
|
pending_hardswap_age_secs,
|
||||||
hardswap_enabled: self.hardswap.load(Ordering::Relaxed),
|
hardswap_enabled: self.reinit.hardswap.load(Ordering::Relaxed),
|
||||||
floor_mode: floor_mode_label(self.floor_mode()),
|
floor_mode: floor_mode_label(self.floor_mode()),
|
||||||
adaptive_floor_idle_secs: self.me_adaptive_floor_idle_secs.load(Ordering::Relaxed),
|
adaptive_floor_idle_secs: self.me_adaptive_floor_idle_secs.load(Ordering::Relaxed),
|
||||||
adaptive_floor_min_writers_single_endpoint: self
|
adaptive_floor_min_writers_single_endpoint: self
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue