mirror of https://github.com/telemt/telemt.git
DC writer floor is below required only in runtime
This commit is contained in:
parent
64130dd02e
commit
a80be78345
|
|
@ -1051,6 +1051,10 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||||
);
|
);
|
||||||
info!("============================================================");
|
info!("============================================================");
|
||||||
|
|
||||||
|
if let Some(ref pool) = me_pool {
|
||||||
|
pool.set_runtime_ready(true);
|
||||||
|
}
|
||||||
|
|
||||||
// Background tasks
|
// Background tasks
|
||||||
let um_clone = upstream_manager.clone();
|
let um_clone = upstream_manager.clone();
|
||||||
let decision_clone = decision.clone();
|
let decision_clone = decision.clone();
|
||||||
|
|
|
||||||
|
|
@ -295,15 +295,27 @@ async fn check_family(
|
||||||
let wait = Duration::from_millis(next_ms)
|
let wait = Duration::from_millis(next_ms)
|
||||||
+ Duration::from_millis(rand::rng().random_range(0..=jitter.max(1)));
|
+ Duration::from_millis(rand::rng().random_range(0..=jitter.max(1)));
|
||||||
next_attempt.insert(key, now + wait);
|
next_attempt.insert(key, now + wait);
|
||||||
warn!(
|
if pool.is_runtime_ready() {
|
||||||
dc = %dc,
|
warn!(
|
||||||
?family,
|
dc = %dc,
|
||||||
alive = now_alive,
|
?family,
|
||||||
required,
|
alive = now_alive,
|
||||||
endpoint_count = endpoints.len(),
|
required,
|
||||||
backoff_ms = next_ms,
|
endpoint_count = endpoints.len(),
|
||||||
"DC writer floor is below required level, scheduled reconnect"
|
backoff_ms = next_ms,
|
||||||
);
|
"DC writer floor is below required level, scheduled reconnect"
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
info!(
|
||||||
|
dc = %dc,
|
||||||
|
?family,
|
||||||
|
alive = now_alive,
|
||||||
|
required,
|
||||||
|
endpoint_count = endpoints.len(),
|
||||||
|
backoff_ms = next_ms,
|
||||||
|
"DC writer floor is below required level during startup, scheduled reconnect"
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if let Some(v) = inflight.get_mut(&key) {
|
if let Some(v) = inflight.get_mut(&key) {
|
||||||
*v = v.saturating_sub(1);
|
*v = v.saturating_sub(1);
|
||||||
|
|
|
||||||
|
|
@ -149,6 +149,7 @@ pub struct MePool {
|
||||||
pub(super) me_route_no_writer_wait: Duration,
|
pub(super) me_route_no_writer_wait: Duration,
|
||||||
pub(super) me_route_inline_recovery_attempts: u32,
|
pub(super) me_route_inline_recovery_attempts: u32,
|
||||||
pub(super) me_route_inline_recovery_wait: Duration,
|
pub(super) me_route_inline_recovery_wait: Duration,
|
||||||
|
pub(super) runtime_ready: AtomicBool,
|
||||||
pool_size: usize,
|
pool_size: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -355,6 +356,7 @@ impl MePool {
|
||||||
me_route_no_writer_wait: Duration::from_millis(me_route_no_writer_wait_ms),
|
me_route_no_writer_wait: Duration::from_millis(me_route_no_writer_wait_ms),
|
||||||
me_route_inline_recovery_attempts,
|
me_route_inline_recovery_attempts,
|
||||||
me_route_inline_recovery_wait: Duration::from_millis(me_route_inline_recovery_wait_ms),
|
me_route_inline_recovery_wait: Duration::from_millis(me_route_inline_recovery_wait_ms),
|
||||||
|
runtime_ready: AtomicBool::new(false),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -362,6 +364,14 @@ impl MePool {
|
||||||
self.active_generation.load(Ordering::Relaxed)
|
self.active_generation.load(Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn set_runtime_ready(&self, ready: bool) {
|
||||||
|
self.runtime_ready.store(ready, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_runtime_ready(&self) -> bool {
|
||||||
|
self.runtime_ready.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn update_runtime_reinit_policy(
|
pub fn update_runtime_reinit_policy(
|
||||||
&self,
|
&self,
|
||||||
hardswap: bool,
|
hardswap: bool,
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue