Event-driven Drafts

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey 2026-03-07 03:22:01 +03:00
parent 89222e7123
commit 0ff2e95e49
No known key found for this signature in database
4 changed files with 84 additions and 25 deletions

View File

@ -22,6 +22,10 @@ const IDLE_REFRESH_TRIGGER_BASE_SECS: u64 = 45;
const IDLE_REFRESH_TRIGGER_JITTER_SECS: u64 = 5; const IDLE_REFRESH_TRIGGER_JITTER_SECS: u64 = 5;
const IDLE_REFRESH_RETRY_SECS: u64 = 8; const IDLE_REFRESH_RETRY_SECS: u64 = 8;
const IDLE_REFRESH_SUCCESS_GUARD_SECS: u64 = 5; const IDLE_REFRESH_SUCCESS_GUARD_SECS: u64 = 5;
const HEALTH_RECONNECT_BUDGET_PER_CORE: usize = 2;
const HEALTH_RECONNECT_BUDGET_PER_DC: usize = 1;
const HEALTH_RECONNECT_BUDGET_MIN: usize = 4;
const HEALTH_RECONNECT_BUDGET_MAX: usize = 128;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct DcFloorPlanEntry { struct DcFloorPlanEntry {
@ -114,22 +118,23 @@ async fn check_family(
return; return;
} }
let map = match family {
IpFamily::V4 => pool.proxy_map_v4.read().await.clone(),
IpFamily::V6 => pool.proxy_map_v6.read().await.clone(),
};
let mut dc_endpoints = HashMap::<i32, Vec<SocketAddr>>::new(); let mut dc_endpoints = HashMap::<i32, Vec<SocketAddr>>::new();
for (dc, addrs) in map { let map_guard = match family {
let entry = dc_endpoints.entry(dc).or_default(); IpFamily::V4 => pool.proxy_map_v4.read().await,
for (ip, port) in addrs { IpFamily::V6 => pool.proxy_map_v6.read().await,
};
for (dc, addrs) in map_guard.iter() {
let entry = dc_endpoints.entry(*dc).or_default();
for (ip, port) in addrs.iter().copied() {
entry.push(SocketAddr::new(ip, port)); entry.push(SocketAddr::new(ip, port));
} }
} }
drop(map_guard);
for endpoints in dc_endpoints.values_mut() { for endpoints in dc_endpoints.values_mut() {
endpoints.sort_unstable(); endpoints.sort_unstable();
endpoints.dedup(); endpoints.dedup();
} }
let mut reconnect_budget = health_reconnect_budget(pool, dc_endpoints.len());
if pool.floor_mode() == MeFloorMode::Static { if pool.floor_mode() == MeFloorMode::Static {
adaptive_idle_since.clear(); adaptive_idle_since.clear();
@ -200,6 +205,7 @@ async fn check_family(
required, required,
outage_backoff, outage_backoff,
outage_next_attempt, outage_next_attempt,
&mut reconnect_budget,
) )
.await; .await;
continue; continue;
@ -256,6 +262,24 @@ async fn check_family(
let missing = required - alive; let missing = required - alive;
let now = Instant::now(); let now = Instant::now();
if reconnect_budget == 0 {
let base_ms = pool.me_reconnect_backoff_base.as_millis() as u64;
let next_ms = (*backoff.get(&key).unwrap_or(&base_ms)).max(base_ms);
let jitter = next_ms / JITTER_FRAC_NUM;
let wait = Duration::from_millis(next_ms)
+ Duration::from_millis(rand::rng().random_range(0..=jitter.max(1)));
next_attempt.insert(key, now + wait);
debug!(
dc = %dc,
?family,
alive,
required,
endpoint_count = endpoints.len(),
reconnect_budget,
"Skipping reconnect due to per-tick health reconnect budget"
);
continue;
}
if let Some(ts) = next_attempt.get(&key) if let Some(ts) = next_attempt.get(&key)
&& now < *ts && now < *ts
{ {
@ -281,6 +305,10 @@ async fn check_family(
let mut restored = 0usize; let mut restored = 0usize;
for _ in 0..missing { for _ in 0..missing {
if reconnect_budget == 0 {
break;
}
reconnect_budget = reconnect_budget.saturating_sub(1);
if pool.floor_mode() == MeFloorMode::Adaptive if pool.floor_mode() == MeFloorMode::Adaptive
&& pool.active_writer_count_total().await >= floor_plan.global_cap_effective_total && pool.active_writer_count_total().await >= floor_plan.global_cap_effective_total
{ {
@ -383,6 +411,15 @@ async fn check_family(
} }
} }
fn health_reconnect_budget(pool: &Arc<MePool>, dc_groups: usize) -> usize {
let cpu_cores = pool.adaptive_floor_effective_cpu_cores().max(1);
let by_cpu = cpu_cores.saturating_mul(HEALTH_RECONNECT_BUDGET_PER_CORE);
let by_dc = dc_groups.saturating_mul(HEALTH_RECONNECT_BUDGET_PER_DC);
by_cpu
.saturating_add(by_dc)
.clamp(HEALTH_RECONNECT_BUDGET_MIN, HEALTH_RECONNECT_BUDGET_MAX)
}
fn adaptive_floor_class_min( fn adaptive_floor_class_min(
pool: &Arc<MePool>, pool: &Arc<MePool>,
endpoint_count: usize, endpoint_count: usize,
@ -816,6 +853,7 @@ async fn recover_single_endpoint_outage(
required: usize, required: usize,
outage_backoff: &mut HashMap<(i32, IpFamily), u64>, outage_backoff: &mut HashMap<(i32, IpFamily), u64>,
outage_next_attempt: &mut HashMap<(i32, IpFamily), Instant>, outage_next_attempt: &mut HashMap<(i32, IpFamily), Instant>,
reconnect_budget: &mut usize,
) { ) {
let now = Instant::now(); let now = Instant::now();
if let Some(ts) = outage_next_attempt.get(&key) if let Some(ts) = outage_next_attempt.get(&key)
@ -825,6 +863,18 @@ async fn recover_single_endpoint_outage(
} }
let (min_backoff_ms, max_backoff_ms) = pool.single_endpoint_outage_backoff_bounds_ms(); let (min_backoff_ms, max_backoff_ms) = pool.single_endpoint_outage_backoff_bounds_ms();
if *reconnect_budget == 0 {
outage_next_attempt.insert(key, now + Duration::from_millis(min_backoff_ms.max(250)));
debug!(
dc = %key.0,
family = ?key.1,
%endpoint,
required,
"Single-endpoint outage reconnect deferred by health reconnect budget"
);
return;
}
*reconnect_budget = (*reconnect_budget).saturating_sub(1);
pool.stats pool.stats
.increment_me_single_endpoint_outage_reconnect_attempt_total(); .increment_me_single_endpoint_outage_reconnect_attempt_total();

View File

@ -124,6 +124,7 @@ pub struct MePool {
pub(super) me_adaptive_floor_target_writers_total: AtomicU64, pub(super) me_adaptive_floor_target_writers_total: AtomicU64,
pub(super) proxy_map_v4: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>, pub(super) proxy_map_v4: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
pub(super) proxy_map_v6: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>, pub(super) proxy_map_v6: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
pub(super) endpoint_dc_map: Arc<RwLock<HashMap<SocketAddr, Option<i32>>>>,
pub(super) default_dc: AtomicI32, pub(super) default_dc: AtomicI32,
pub(super) next_writer_id: AtomicU64, pub(super) next_writer_id: AtomicU64,
pub(super) ping_tracker: Arc<Mutex<HashMap<i64, (std::time::Instant, u64)>>>, pub(super) ping_tracker: Arc<Mutex<HashMap<i64, (std::time::Instant, u64)>>>,
@ -254,6 +255,7 @@ impl MePool {
me_route_inline_recovery_attempts: u32, me_route_inline_recovery_attempts: u32,
me_route_inline_recovery_wait_ms: u64, me_route_inline_recovery_wait_ms: u64,
) -> Arc<Self> { ) -> Arc<Self> {
let endpoint_dc_map = Self::build_endpoint_dc_map_from_maps(&proxy_map_v4, &proxy_map_v6);
let registry = Arc::new(ConnRegistry::new()); let registry = Arc::new(ConnRegistry::new());
registry.update_route_backpressure_policy( registry.update_route_backpressure_policy(
me_route_backpressure_base_timeout_ms, me_route_backpressure_base_timeout_ms,
@ -355,6 +357,7 @@ impl MePool {
pool_size: 2, pool_size: 2,
proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)), proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)),
proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)), proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)),
endpoint_dc_map: Arc::new(RwLock::new(endpoint_dc_map)),
default_dc: AtomicI32::new(default_dc.unwrap_or(2)), default_dc: AtomicI32::new(default_dc.unwrap_or(2)),
next_writer_id: AtomicU64::new(1), next_writer_id: AtomicU64::new(1),
ping_tracker: Arc::new(Mutex::new(HashMap::new())), ping_tracker: Arc::new(Mutex::new(HashMap::new())),

View File

@ -172,7 +172,7 @@ impl MePool {
let target_dc = self.resolve_dc_for_endpoint(addr).await; let target_dc = self.resolve_dc_for_endpoint(addr).await;
if self.decision.ipv4_me { if self.decision.ipv4_me {
let map = self.proxy_map_v4.read().await.clone(); let map = self.proxy_map_v4.read().await;
if let Some(addrs) = map.get(&target_dc) { if let Some(addrs) = map.get(&target_dc) {
for (ip, port) in addrs { for (ip, port) in addrs {
endpoints.insert(SocketAddr::new(*ip, *port)); endpoints.insert(SocketAddr::new(*ip, *port));
@ -181,7 +181,7 @@ impl MePool {
} }
if self.decision.ipv6_me { if self.decision.ipv6_me {
let map = self.proxy_map_v6.read().await.clone(); let map = self.proxy_map_v6.read().await;
if let Some(addrs) = map.get(&target_dc) { if let Some(addrs) = map.get(&target_dc) {
for (ip, port) in addrs { for (ip, port) in addrs {
endpoints.insert(SocketAddr::new(*ip, *port)); endpoints.insert(SocketAddr::new(*ip, *port));

View File

@ -378,12 +378,16 @@ impl MePool {
return self.has_candidate_for_target_dc(target_dc).await; return self.has_candidate_for_target_dc(target_dc).await;
} }
let remaining = deadline.saturating_duration_since(now);
let sleep_for = remaining.min(Duration::from_millis(25));
let waiter = self.writer_available.notified(); let waiter = self.writer_available.notified();
tokio::select! { if self.has_candidate_for_target_dc(target_dc).await {
_ = waiter => {} return true;
_ = tokio::time::sleep(sleep_for) => {} }
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
return self.has_candidate_for_target_dc(target_dc).await;
}
if tokio::time::timeout(remaining, waiter).await.is_err() {
return self.has_candidate_for_target_dc(target_dc).await;
} }
} }
} }
@ -423,11 +427,11 @@ impl MePool {
self.stats.increment_me_async_recovery_trigger_total(); self.stats.increment_me_async_recovery_trigger_total();
let mut seen = HashSet::<SocketAddr>::new(); let mut seen = HashSet::<SocketAddr>::new();
for family in self.family_order() { for family in self.family_order() {
let map = match family { let map_guard = match family {
IpFamily::V4 => self.proxy_map_v4.read().await.clone(), IpFamily::V4 => self.proxy_map_v4.read().await,
IpFamily::V6 => self.proxy_map_v6.read().await.clone(), IpFamily::V6 => self.proxy_map_v6.read().await,
}; };
for addrs in map.values() { for addrs in map_guard.values() {
for (ip, port) in addrs { for (ip, port) in addrs {
let addr = SocketAddr::new(*ip, *port); let addr = SocketAddr::new(*ip, *port);
if seen.insert(addr) { if seen.insert(addr) {
@ -448,13 +452,13 @@ impl MePool {
let lookup_keys = self.dc_lookup_chain_for_target(key); let lookup_keys = self.dc_lookup_chain_for_target(key);
for family in self.family_order() { for family in self.family_order() {
let map = match family { let map_guard = match family {
IpFamily::V4 => self.proxy_map_v4.read().await.clone(), IpFamily::V4 => self.proxy_map_v4.read().await,
IpFamily::V6 => self.proxy_map_v6.read().await.clone(), IpFamily::V6 => self.proxy_map_v6.read().await,
}; };
let mut family_selected = Vec::<SocketAddr>::new(); let mut family_selected = Vec::<SocketAddr>::new();
for lookup in lookup_keys.iter().copied() { for lookup in lookup_keys.iter().copied() {
if let Some(addrs) = map.get(&lookup) { if let Some(addrs) = map_guard.get(&lookup) {
for (ip, port) in addrs { for (ip, port) in addrs {
family_selected.push(SocketAddr::new(*ip, *port)); family_selected.push(SocketAddr::new(*ip, *port));
} }
@ -557,7 +561,7 @@ impl MePool {
include_warm: bool, include_warm: bool,
) -> Vec<usize> { ) -> Vec<usize> {
let key = target_dc as i32; let key = target_dc as i32;
let mut preferred = Vec::<SocketAddr>::new(); let mut preferred = HashSet::<SocketAddr>::new();
let lookup_keys = self.dc_lookup_chain_for_target(key); let lookup_keys = self.dc_lookup_chain_for_target(key);
for family in self.family_order() { for family in self.family_order() {
@ -574,7 +578,9 @@ impl MePool {
break; break;
} }
} }
preferred.extend(family_selected); for endpoint in family_selected {
preferred.insert(endpoint);
}
drop(map_guard); drop(map_guard);