mirror of https://github.com/telemt/telemt.git
Update health.rs
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
parent
58ff0c7971
commit
4a0d88ad43
|
|
@ -7,6 +7,7 @@ use std::time::{Duration, Instant};
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
use tracing::{debug, info, warn};
|
use tracing::{debug, info, warn};
|
||||||
|
|
||||||
|
use crate::config::MeFloorMode;
|
||||||
use crate::crypto::SecureRandom;
|
use crate::crypto::SecureRandom;
|
||||||
use crate::network::IpFamily;
|
use crate::network::IpFamily;
|
||||||
|
|
||||||
|
|
@ -26,6 +27,8 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
|
||||||
let mut outage_next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new();
|
let mut outage_next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new();
|
||||||
let mut single_endpoint_outage: HashSet<(i32, IpFamily)> = HashSet::new();
|
let mut single_endpoint_outage: HashSet<(i32, IpFamily)> = HashSet::new();
|
||||||
let mut shadow_rotate_deadline: HashMap<(i32, IpFamily), Instant> = HashMap::new();
|
let mut shadow_rotate_deadline: HashMap<(i32, IpFamily), Instant> = HashMap::new();
|
||||||
|
let mut adaptive_idle_since: HashMap<(i32, IpFamily), Instant> = HashMap::new();
|
||||||
|
let mut adaptive_recover_until: HashMap<(i32, IpFamily), Instant> = HashMap::new();
|
||||||
loop {
|
loop {
|
||||||
tokio::time::sleep(Duration::from_secs(HEALTH_INTERVAL_SECS)).await;
|
tokio::time::sleep(Duration::from_secs(HEALTH_INTERVAL_SECS)).await;
|
||||||
pool.prune_closed_writers().await;
|
pool.prune_closed_writers().await;
|
||||||
|
|
@ -40,6 +43,8 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
|
||||||
&mut outage_next_attempt,
|
&mut outage_next_attempt,
|
||||||
&mut single_endpoint_outage,
|
&mut single_endpoint_outage,
|
||||||
&mut shadow_rotate_deadline,
|
&mut shadow_rotate_deadline,
|
||||||
|
&mut adaptive_idle_since,
|
||||||
|
&mut adaptive_recover_until,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
check_family(
|
check_family(
|
||||||
|
|
@ -53,6 +58,8 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
|
||||||
&mut outage_next_attempt,
|
&mut outage_next_attempt,
|
||||||
&mut single_endpoint_outage,
|
&mut single_endpoint_outage,
|
||||||
&mut shadow_rotate_deadline,
|
&mut shadow_rotate_deadline,
|
||||||
|
&mut adaptive_idle_since,
|
||||||
|
&mut adaptive_recover_until,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
@ -69,6 +76,8 @@ async fn check_family(
|
||||||
outage_next_attempt: &mut HashMap<(i32, IpFamily), Instant>,
|
outage_next_attempt: &mut HashMap<(i32, IpFamily), Instant>,
|
||||||
single_endpoint_outage: &mut HashSet<(i32, IpFamily)>,
|
single_endpoint_outage: &mut HashSet<(i32, IpFamily)>,
|
||||||
shadow_rotate_deadline: &mut HashMap<(i32, IpFamily), Instant>,
|
shadow_rotate_deadline: &mut HashMap<(i32, IpFamily), Instant>,
|
||||||
|
adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>,
|
||||||
|
adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>,
|
||||||
) {
|
) {
|
||||||
let enabled = match family {
|
let enabled = match family {
|
||||||
IpFamily::V4 => pool.decision.ipv4_me,
|
IpFamily::V4 => pool.decision.ipv4_me,
|
||||||
|
|
@ -95,6 +104,11 @@ async fn check_family(
|
||||||
endpoints.dedup();
|
endpoints.dedup();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if pool.floor_mode() == MeFloorMode::Static {
|
||||||
|
adaptive_idle_since.clear();
|
||||||
|
adaptive_recover_until.clear();
|
||||||
|
}
|
||||||
|
|
||||||
let mut live_addr_counts = HashMap::<SocketAddr, usize>::new();
|
let mut live_addr_counts = HashMap::<SocketAddr, usize>::new();
|
||||||
let mut live_writer_ids_by_addr = HashMap::<SocketAddr, Vec<u64>>::new();
|
let mut live_writer_ids_by_addr = HashMap::<SocketAddr, Vec<u64>>::new();
|
||||||
for writer in pool.writers.read().await.iter().filter(|w| {
|
for writer in pool.writers.read().await.iter().filter(|w| {
|
||||||
|
|
@ -111,12 +125,21 @@ async fn check_family(
|
||||||
if endpoints.is_empty() {
|
if endpoints.is_empty() {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let required = pool.required_writers_for_dc(endpoints.len());
|
let key = (dc, family);
|
||||||
|
let reduce_for_idle = should_reduce_floor_for_idle(
|
||||||
|
pool,
|
||||||
|
key,
|
||||||
|
&endpoints,
|
||||||
|
&live_writer_ids_by_addr,
|
||||||
|
adaptive_idle_since,
|
||||||
|
adaptive_recover_until,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
let required = pool.required_writers_for_dc_with_floor_mode(endpoints.len(), reduce_for_idle);
|
||||||
let alive = endpoints
|
let alive = endpoints
|
||||||
.iter()
|
.iter()
|
||||||
.map(|addr| *live_addr_counts.get(addr).unwrap_or(&0))
|
.map(|addr| *live_addr_counts.get(addr).unwrap_or(&0))
|
||||||
.sum::<usize>();
|
.sum::<usize>();
|
||||||
let key = (dc, family);
|
|
||||||
|
|
||||||
if endpoints.len() == 1 && pool.single_endpoint_outage_mode_enabled() && alive == 0 {
|
if endpoints.len() == 1 && pool.single_endpoint_outage_mode_enabled() && alive == 0 {
|
||||||
if single_endpoint_outage.insert(key) {
|
if single_endpoint_outage.insert(key) {
|
||||||
|
|
@ -148,6 +171,8 @@ async fn check_family(
|
||||||
outage_backoff.remove(&key);
|
outage_backoff.remove(&key);
|
||||||
outage_next_attempt.remove(&key);
|
outage_next_attempt.remove(&key);
|
||||||
shadow_rotate_deadline.remove(&key);
|
shadow_rotate_deadline.remove(&key);
|
||||||
|
adaptive_idle_since.remove(&key);
|
||||||
|
adaptive_recover_until.remove(&key);
|
||||||
info!(
|
info!(
|
||||||
dc = %dc,
|
dc = %dc,
|
||||||
?family,
|
?family,
|
||||||
|
|
@ -262,6 +287,54 @@ async fn check_family(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn should_reduce_floor_for_idle(
|
||||||
|
pool: &Arc<MePool>,
|
||||||
|
key: (i32, IpFamily),
|
||||||
|
endpoints: &[SocketAddr],
|
||||||
|
live_writer_ids_by_addr: &HashMap<SocketAddr, Vec<u64>>,
|
||||||
|
adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>,
|
||||||
|
adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>,
|
||||||
|
) -> bool {
|
||||||
|
if endpoints.len() != 1 || pool.floor_mode() != MeFloorMode::Adaptive {
|
||||||
|
adaptive_idle_since.remove(&key);
|
||||||
|
adaptive_recover_until.remove(&key);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
let now = Instant::now();
|
||||||
|
let endpoint = endpoints[0];
|
||||||
|
let writer_ids = live_writer_ids_by_addr
|
||||||
|
.get(&endpoint)
|
||||||
|
.map(Vec::as_slice)
|
||||||
|
.unwrap_or(&[]);
|
||||||
|
let has_bound_clients = has_bound_clients_on_endpoint(pool, writer_ids).await;
|
||||||
|
if has_bound_clients {
|
||||||
|
adaptive_idle_since.remove(&key);
|
||||||
|
adaptive_recover_until.insert(key, now + pool.adaptive_floor_recover_grace_duration());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(recover_until) = adaptive_recover_until.get(&key)
|
||||||
|
&& now < *recover_until
|
||||||
|
{
|
||||||
|
adaptive_idle_since.remove(&key);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
adaptive_recover_until.remove(&key);
|
||||||
|
|
||||||
|
let idle_since = adaptive_idle_since.entry(key).or_insert(now);
|
||||||
|
now.saturating_duration_since(*idle_since) >= pool.adaptive_floor_idle_duration()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn has_bound_clients_on_endpoint(pool: &Arc<MePool>, writer_ids: &[u64]) -> bool {
|
||||||
|
for writer_id in writer_ids {
|
||||||
|
if !pool.registry.is_writer_empty(*writer_id).await {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
async fn recover_single_endpoint_outage(
|
async fn recover_single_endpoint_outage(
|
||||||
pool: &Arc<MePool>,
|
pool: &Arc<MePool>,
|
||||||
rng: &Arc<SecureRandom>,
|
rng: &Arc<SecureRandom>,
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue