Safety Gates Invariants + HybridAsyncPersistent + Watch + Runtime Snapshots + ME Writer Ping Tracker + Parallel Recovery + Backpressure Guardrails

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey 2026-03-25 16:29:35 +03:00
parent c43de1bd2a
commit 41d786cc11
No known key found for this signature in database
19 changed files with 384 additions and 184 deletions

View File

@ -651,6 +651,9 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b
} }
if old.general.me_route_no_writer_mode != new.general.me_route_no_writer_mode if old.general.me_route_no_writer_mode != new.general.me_route_no_writer_mode
|| old.general.me_route_no_writer_wait_ms != new.general.me_route_no_writer_wait_ms || old.general.me_route_no_writer_wait_ms != new.general.me_route_no_writer_wait_ms
|| old.general.me_route_hybrid_max_wait_ms != new.general.me_route_hybrid_max_wait_ms
|| old.general.me_route_blocking_send_timeout_ms
!= new.general.me_route_blocking_send_timeout_ms
|| old.general.me_route_inline_recovery_attempts || old.general.me_route_inline_recovery_attempts
!= new.general.me_route_inline_recovery_attempts != new.general.me_route_inline_recovery_attempts
|| old.general.me_route_inline_recovery_wait_ms || old.general.me_route_inline_recovery_wait_ms

View File

@ -277,6 +277,8 @@ pub(crate) async fn initialize_me_pool(
config.general.me_warn_rate_limit_ms, config.general.me_warn_rate_limit_ms,
config.general.me_route_no_writer_mode, config.general.me_route_no_writer_mode,
config.general.me_route_no_writer_wait_ms, config.general.me_route_no_writer_wait_ms,
config.general.me_route_hybrid_max_wait_ms,
config.general.me_route_blocking_send_timeout_ms,
config.general.me_route_inline_recovery_attempts, config.general.me_route_inline_recovery_attempts,
config.general.me_route_inline_recovery_wait_ms, config.general.me_route_inline_recovery_wait_ms,
); );

View File

@ -2318,6 +2318,20 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
0 0
} }
); );
let _ = writeln!(
out,
"# HELP telemt_me_hybrid_timeout_total ME hybrid route timeouts after bounded retry window"
);
let _ = writeln!(out, "# TYPE telemt_me_hybrid_timeout_total counter");
let _ = writeln!(
out,
"telemt_me_hybrid_timeout_total {}",
if me_allows_normal {
stats.get_me_hybrid_timeout_total()
} else {
0
}
);
let _ = writeln!( let _ = writeln!(
out, out,
"# HELP telemt_me_async_recovery_trigger_total Async ME recovery trigger attempts from route path" "# HELP telemt_me_async_recovery_trigger_total Async ME recovery trigger attempts from route path"

View File

@ -234,6 +234,7 @@ pub struct Stats {
me_writer_restored_same_endpoint_total: AtomicU64, me_writer_restored_same_endpoint_total: AtomicU64,
me_writer_restored_fallback_total: AtomicU64, me_writer_restored_fallback_total: AtomicU64,
me_no_writer_failfast_total: AtomicU64, me_no_writer_failfast_total: AtomicU64,
me_hybrid_timeout_total: AtomicU64,
me_async_recovery_trigger_total: AtomicU64, me_async_recovery_trigger_total: AtomicU64,
me_inline_recovery_total: AtomicU64, me_inline_recovery_total: AtomicU64,
ip_reservation_rollback_tcp_limit_total: AtomicU64, ip_reservation_rollback_tcp_limit_total: AtomicU64,
@ -1203,6 +1204,12 @@ impl Stats {
.fetch_add(1, Ordering::Relaxed); .fetch_add(1, Ordering::Relaxed);
} }
} }
pub fn increment_me_hybrid_timeout_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_hybrid_timeout_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_async_recovery_trigger_total(&self) { pub fn increment_me_async_recovery_trigger_total(&self) {
if self.telemetry_me_allows_normal() { if self.telemetry_me_allows_normal() {
self.me_async_recovery_trigger_total self.me_async_recovery_trigger_total
@ -1876,6 +1883,9 @@ impl Stats {
pub fn get_me_no_writer_failfast_total(&self) -> u64 { pub fn get_me_no_writer_failfast_total(&self) -> u64 {
self.me_no_writer_failfast_total.load(Ordering::Relaxed) self.me_no_writer_failfast_total.load(Ordering::Relaxed)
} }
pub fn get_me_hybrid_timeout_total(&self) -> u64 {
self.me_hybrid_timeout_total.load(Ordering::Relaxed)
}
pub fn get_me_async_recovery_trigger_total(&self) -> u64 { pub fn get_me_async_recovery_trigger_total(&self) -> u64 {
self.me_async_recovery_trigger_total.load(Ordering::Relaxed) self.me_async_recovery_trigger_total.load(Ordering::Relaxed)
} }

View File

@ -314,53 +314,6 @@ async fn run_update_cycle(
reinit_tx: &mpsc::Sender<MeReinitTrigger>, reinit_tx: &mpsc::Sender<MeReinitTrigger>,
) { ) {
let upstream = pool.upstream.clone(); let upstream = pool.upstream.clone();
pool.update_runtime_reinit_policy(
cfg.general.hardswap,
cfg.general.me_pool_drain_ttl_secs,
cfg.general.me_instadrain,
cfg.general.me_pool_drain_threshold,
cfg.general.me_pool_drain_soft_evict_enabled,
cfg.general.me_pool_drain_soft_evict_grace_secs,
cfg.general.me_pool_drain_soft_evict_per_writer,
cfg.general.me_pool_drain_soft_evict_budget_per_core,
cfg.general.me_pool_drain_soft_evict_cooldown_ms,
cfg.general.effective_me_pool_force_close_secs(),
cfg.general.me_pool_min_fresh_ratio,
cfg.general.me_hardswap_warmup_delay_min_ms,
cfg.general.me_hardswap_warmup_delay_max_ms,
cfg.general.me_hardswap_warmup_extra_passes,
cfg.general.me_hardswap_warmup_pass_backoff_base_ms,
cfg.general.me_bind_stale_mode,
cfg.general.me_bind_stale_ttl_secs,
cfg.general.me_secret_atomic_snapshot,
cfg.general.me_deterministic_writer_sort,
cfg.general.me_writer_pick_mode,
cfg.general.me_writer_pick_sample_size,
cfg.general.me_single_endpoint_shadow_writers,
cfg.general.me_single_endpoint_outage_mode_enabled,
cfg.general.me_single_endpoint_outage_disable_quarantine,
cfg.general.me_single_endpoint_outage_backoff_min_ms,
cfg.general.me_single_endpoint_outage_backoff_max_ms,
cfg.general.me_single_endpoint_shadow_rotate_every_secs,
cfg.general.me_floor_mode,
cfg.general.me_adaptive_floor_idle_secs,
cfg.general.me_adaptive_floor_min_writers_single_endpoint,
cfg.general.me_adaptive_floor_min_writers_multi_endpoint,
cfg.general.me_adaptive_floor_recover_grace_secs,
cfg.general.me_adaptive_floor_writers_per_core_total,
cfg.general.me_adaptive_floor_cpu_cores_override,
cfg.general
.me_adaptive_floor_max_extra_writers_single_per_core,
cfg.general
.me_adaptive_floor_max_extra_writers_multi_per_core,
cfg.general.me_adaptive_floor_max_active_writers_per_core,
cfg.general.me_adaptive_floor_max_warm_writers_per_core,
cfg.general.me_adaptive_floor_max_active_writers_global,
cfg.general.me_adaptive_floor_max_warm_writers_global,
cfg.general.me_health_interval_ms_unhealthy,
cfg.general.me_health_interval_ms_healthy,
cfg.general.me_warn_rate_limit_ms,
);
let required_cfg_snapshots = cfg.general.me_config_stable_snapshots.max(1); let required_cfg_snapshots = cfg.general.me_config_stable_snapshots.max(1);
let required_secret_snapshots = cfg.general.proxy_secret_stable_snapshots.max(1); let required_secret_snapshots = cfg.general.proxy_secret_stable_snapshots.max(1);

View File

@ -7,6 +7,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use rand::RngExt; use rand::RngExt;
use tokio::sync::Semaphore;
use tracing::{debug, info, warn}; use tracing::{debug, info, warn};
use crate::config::MeFloorMode; use crate::config::MeFloorMode;
@ -78,6 +79,7 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
}; };
tokio::time::sleep(interval).await; tokio::time::sleep(interval).await;
pool.prune_closed_writers().await; pool.prune_closed_writers().await;
pool.sweep_endpoint_quarantine().await;
reap_draining_writers(&pool, &mut drain_warn_next_allowed).await; reap_draining_writers(&pool, &mut drain_warn_next_allowed).await;
let v4_degraded = check_family( let v4_degraded = check_family(
IpFamily::V4, IpFamily::V4,
@ -365,7 +367,8 @@ async fn check_family(
endpoints.sort_unstable(); endpoints.sort_unstable();
endpoints.dedup(); endpoints.dedup();
} }
let mut reconnect_budget = health_reconnect_budget(pool, dc_endpoints.len()); let reconnect_budget = health_reconnect_budget(pool, dc_endpoints.len());
let reconnect_sem = Arc::new(Semaphore::new(reconnect_budget));
if pool.floor_mode() == MeFloorMode::Static { if pool.floor_mode() == MeFloorMode::Static {
adaptive_idle_since.clear(); adaptive_idle_since.clear();
@ -461,7 +464,7 @@ async fn check_family(
required, required,
outage_backoff, outage_backoff,
outage_next_attempt, outage_next_attempt,
&mut reconnect_budget, &reconnect_sem,
) )
.await; .await;
continue; continue;
@ -521,7 +524,7 @@ async fn check_family(
family_degraded = true; family_degraded = true;
let now = Instant::now(); let now = Instant::now();
if reconnect_budget == 0 { if reconnect_sem.available_permits() == 0 {
let base_ms = pool.me_reconnect_backoff_base.as_millis() as u64; let base_ms = pool.me_reconnect_backoff_base.as_millis() as u64;
let next_ms = (*backoff.get(&key).unwrap_or(&base_ms)).max(base_ms); let next_ms = (*backoff.get(&key).unwrap_or(&base_ms)).max(base_ms);
let jitter = next_ms / JITTER_FRAC_NUM; let jitter = next_ms / JITTER_FRAC_NUM;
@ -567,10 +570,9 @@ async fn check_family(
let mut restored = 0usize; let mut restored = 0usize;
for _ in 0..missing { for _ in 0..missing {
if reconnect_budget == 0 { let Ok(reconnect_permit) = reconnect_sem.clone().try_acquire_owned() else {
break; break;
} };
reconnect_budget = reconnect_budget.saturating_sub(1);
if pool.active_contour_writer_count_total().await if pool.active_contour_writer_count_total().await
>= floor_plan.active_cap_effective_total >= floor_plan.active_cap_effective_total
{ {
@ -621,6 +623,7 @@ async fn check_family(
debug!(dc = %dc, ?family, "ME reconnect timed out"); debug!(dc = %dc, ?family, "ME reconnect timed out");
} }
} }
drop(reconnect_permit);
} }
let now_alive = alive + restored; let now_alive = alive + restored;
@ -1188,7 +1191,7 @@ async fn recover_single_endpoint_outage(
required: usize, required: usize,
outage_backoff: &mut HashMap<(i32, IpFamily), u64>, outage_backoff: &mut HashMap<(i32, IpFamily), u64>,
outage_next_attempt: &mut HashMap<(i32, IpFamily), Instant>, outage_next_attempt: &mut HashMap<(i32, IpFamily), Instant>,
reconnect_budget: &mut usize, reconnect_sem: &Arc<Semaphore>,
) { ) {
let now = Instant::now(); let now = Instant::now();
if let Some(ts) = outage_next_attempt.get(&key) if let Some(ts) = outage_next_attempt.get(&key)
@ -1198,7 +1201,7 @@ async fn recover_single_endpoint_outage(
} }
let (min_backoff_ms, max_backoff_ms) = pool.single_endpoint_outage_backoff_bounds_ms(); let (min_backoff_ms, max_backoff_ms) = pool.single_endpoint_outage_backoff_bounds_ms();
if *reconnect_budget == 0 { if reconnect_sem.available_permits() == 0 {
outage_next_attempt.insert(key, now + Duration::from_millis(min_backoff_ms.max(250))); outage_next_attempt.insert(key, now + Duration::from_millis(min_backoff_ms.max(250)));
debug!( debug!(
dc = %key.0, dc = %key.0,
@ -1209,7 +1212,17 @@ async fn recover_single_endpoint_outage(
); );
return; return;
} }
*reconnect_budget = (*reconnect_budget).saturating_sub(1); let Ok(_reconnect_permit) = reconnect_sem.clone().try_acquire_owned() else {
outage_next_attempt.insert(key, now + Duration::from_millis(min_backoff_ms.max(250)));
debug!(
dc = %key.0,
family = ?key.1,
%endpoint,
required,
"Single-endpoint outage reconnect deferred by semaphore saturation"
);
return;
};
pool.stats pool.stats
.increment_me_single_endpoint_outage_reconnect_attempt_total(); .increment_me_single_endpoint_outage_reconnect_attempt_total();
@ -1687,6 +1700,8 @@ mod tests {
general.me_warn_rate_limit_ms, general.me_warn_rate_limit_ms,
MeRouteNoWriterMode::default(), MeRouteNoWriterMode::default(),
general.me_route_no_writer_wait_ms, general.me_route_no_writer_wait_ms,
general.me_route_hybrid_max_wait_ms,
general.me_route_blocking_send_timeout_ms,
general.me_route_inline_recovery_attempts, general.me_route_inline_recovery_attempts,
general.me_route_inline_recovery_wait_ms, general.me_route_inline_recovery_wait_ms,
) )

View File

@ -8,7 +8,8 @@ use std::sync::atomic::{
}; };
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::sync::{Mutex, Notify, RwLock, mpsc}; use arc_swap::ArcSwap;
use tokio::sync::{Mutex, RwLock, mpsc, watch};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use crate::config::{ use crate::config::{
@ -69,6 +70,10 @@ impl WriterContour {
} }
pub(super) fn from_u8(value: u8) -> Self { pub(super) fn from_u8(value: u8) -> Self {
debug_assert!(
value <= Self::Draining as u8,
"Unexpected WriterContour discriminant: {value}"
);
match value { match value {
0 => Self::Warm, 0 => Self::Warm,
1 => Self::Active, 1 => Self::Active,
@ -87,6 +92,33 @@ pub(crate) enum MeFamilyRuntimeState {
Recovering = 3, Recovering = 3,
} }
#[derive(Debug, Clone)]
pub(crate) struct FamilyHealthSnapshot {
pub(crate) state: MeFamilyRuntimeState,
pub(crate) state_since_epoch_secs: u64,
pub(crate) suppressed_until_epoch_secs: u64,
pub(crate) fail_streak: u32,
pub(crate) recover_success_streak: u32,
}
impl FamilyHealthSnapshot {
fn new(
state: MeFamilyRuntimeState,
state_since_epoch_secs: u64,
suppressed_until_epoch_secs: u64,
fail_streak: u32,
recover_success_streak: u32,
) -> Self {
Self {
state,
state_since_epoch_secs,
suppressed_until_epoch_secs,
fail_streak,
recover_success_streak,
}
}
}
impl MeFamilyRuntimeState { impl MeFamilyRuntimeState {
pub(crate) fn from_u8(value: u8) -> Self { pub(crate) fn from_u8(value: u8) -> Self {
match value { match value {
@ -214,13 +246,11 @@ pub struct MePool {
pub(super) endpoint_dc_map: Arc<RwLock<HashMap<SocketAddr, Option<i32>>>>, pub(super) endpoint_dc_map: Arc<RwLock<HashMap<SocketAddr, Option<i32>>>>,
pub(super) default_dc: AtomicI32, pub(super) default_dc: AtomicI32,
pub(super) next_writer_id: AtomicU64, pub(super) next_writer_id: AtomicU64,
pub(super) ping_tracker: Arc<Mutex<HashMap<i64, (std::time::Instant, u64)>>>,
pub(super) ping_tracker_last_cleanup_epoch_ms: AtomicU64,
pub(super) rtt_stats: Arc<Mutex<HashMap<u64, (f64, f64)>>>, pub(super) rtt_stats: Arc<Mutex<HashMap<u64, (f64, f64)>>>,
pub(super) nat_reflection_cache: Arc<Mutex<NatReflectionCache>>, pub(super) nat_reflection_cache: Arc<Mutex<NatReflectionCache>>,
pub(super) nat_reflection_singleflight_v4: Arc<Mutex<()>>, pub(super) nat_reflection_singleflight_v4: Arc<Mutex<()>>,
pub(super) nat_reflection_singleflight_v6: Arc<Mutex<()>>, pub(super) nat_reflection_singleflight_v6: Arc<Mutex<()>>,
pub(super) writer_available: Arc<Notify>, pub(super) writer_epoch: watch::Sender<u64>,
pub(super) refill_inflight: Arc<Mutex<HashSet<RefillEndpointKey>>>, pub(super) refill_inflight: Arc<Mutex<HashSet<RefillEndpointKey>>>,
pub(super) refill_inflight_dc: Arc<Mutex<HashSet<RefillDcKey>>>, pub(super) refill_inflight_dc: Arc<Mutex<HashSet<RefillDcKey>>>,
pub(super) conn_count: AtomicUsize, pub(super) conn_count: AtomicUsize,
@ -259,21 +289,18 @@ pub struct MePool {
pub(super) me_reader_route_data_wait_ms: Arc<AtomicU64>, pub(super) me_reader_route_data_wait_ms: Arc<AtomicU64>,
pub(super) me_route_no_writer_mode: AtomicU8, pub(super) me_route_no_writer_mode: AtomicU8,
pub(super) me_route_no_writer_wait: Duration, pub(super) me_route_no_writer_wait: Duration,
pub(super) me_route_hybrid_max_wait: Duration,
pub(super) me_route_blocking_send_timeout: Option<Duration>,
pub(super) me_route_last_success_epoch_ms: AtomicU64,
pub(super) me_route_hybrid_timeout_warn_epoch_ms: AtomicU64,
pub(super) me_async_recovery_last_trigger_epoch_ms: AtomicU64,
pub(super) me_route_inline_recovery_attempts: u32, pub(super) me_route_inline_recovery_attempts: u32,
pub(super) me_route_inline_recovery_wait: Duration, pub(super) me_route_inline_recovery_wait: Duration,
pub(super) me_health_interval_ms_unhealthy: AtomicU64, pub(super) me_health_interval_ms_unhealthy: AtomicU64,
pub(super) me_health_interval_ms_healthy: AtomicU64, pub(super) me_health_interval_ms_healthy: AtomicU64,
pub(super) me_warn_rate_limit_ms: AtomicU64, pub(super) me_warn_rate_limit_ms: AtomicU64,
pub(super) me_family_v4_runtime_state: AtomicU8, pub(super) family_health_v4: ArcSwap<FamilyHealthSnapshot>,
pub(super) me_family_v6_runtime_state: AtomicU8, pub(super) family_health_v6: ArcSwap<FamilyHealthSnapshot>,
pub(super) me_family_v4_state_since_epoch_secs: AtomicU64,
pub(super) me_family_v6_state_since_epoch_secs: AtomicU64,
pub(super) me_family_v4_suppressed_until_epoch_secs: AtomicU64,
pub(super) me_family_v6_suppressed_until_epoch_secs: AtomicU64,
pub(super) me_family_v4_fail_streak: AtomicU32,
pub(super) me_family_v6_fail_streak: AtomicU32,
pub(super) me_family_v4_recover_success_streak: AtomicU32,
pub(super) me_family_v6_recover_success_streak: AtomicU32,
pub(super) me_last_drain_gate_route_quorum_ok: AtomicBool, pub(super) me_last_drain_gate_route_quorum_ok: AtomicBool,
pub(super) me_last_drain_gate_redundancy_ok: AtomicBool, pub(super) me_last_drain_gate_redundancy_ok: AtomicBool,
pub(super) me_last_drain_gate_block_reason: AtomicU8, pub(super) me_last_drain_gate_block_reason: AtomicU8,
@ -396,6 +423,8 @@ impl MePool {
me_warn_rate_limit_ms: u64, me_warn_rate_limit_ms: u64,
me_route_no_writer_mode: MeRouteNoWriterMode, me_route_no_writer_mode: MeRouteNoWriterMode,
me_route_no_writer_wait_ms: u64, me_route_no_writer_wait_ms: u64,
me_route_hybrid_max_wait_ms: u64,
me_route_blocking_send_timeout_ms: u64,
me_route_inline_recovery_attempts: u32, me_route_inline_recovery_attempts: u32,
me_route_inline_recovery_wait_ms: u64, me_route_inline_recovery_wait_ms: u64,
) -> Arc<Self> { ) -> Arc<Self> {
@ -410,6 +439,8 @@ impl MePool {
me_route_backpressure_high_timeout_ms, me_route_backpressure_high_timeout_ms,
me_route_backpressure_high_watermark_pct, me_route_backpressure_high_watermark_pct,
); );
let (writer_epoch, _) = watch::channel(0u64);
let now_epoch_secs = Self::now_epoch_secs();
Arc::new(Self { Arc::new(Self {
registry, registry,
writers: Arc::new(RwLock::new(Vec::new())), writers: Arc::new(RwLock::new(Vec::new())),
@ -527,13 +558,11 @@ impl MePool {
endpoint_dc_map: Arc::new(RwLock::new(endpoint_dc_map)), endpoint_dc_map: Arc::new(RwLock::new(endpoint_dc_map)),
default_dc: AtomicI32::new(default_dc.unwrap_or(2)), default_dc: AtomicI32::new(default_dc.unwrap_or(2)),
next_writer_id: AtomicU64::new(1), next_writer_id: AtomicU64::new(1),
ping_tracker: Arc::new(Mutex::new(HashMap::new())),
ping_tracker_last_cleanup_epoch_ms: AtomicU64::new(0),
rtt_stats: Arc::new(Mutex::new(HashMap::new())), rtt_stats: Arc::new(Mutex::new(HashMap::new())),
nat_reflection_cache: Arc::new(Mutex::new(NatReflectionCache::default())), nat_reflection_cache: Arc::new(Mutex::new(NatReflectionCache::default())),
nat_reflection_singleflight_v4: Arc::new(Mutex::new(())), nat_reflection_singleflight_v4: Arc::new(Mutex::new(())),
nat_reflection_singleflight_v6: Arc::new(Mutex::new(())), nat_reflection_singleflight_v6: Arc::new(Mutex::new(())),
writer_available: Arc::new(Notify::new()), writer_epoch,
refill_inflight: Arc::new(Mutex::new(HashSet::new())), refill_inflight: Arc::new(Mutex::new(HashSet::new())),
refill_inflight_dc: Arc::new(Mutex::new(HashSet::new())), refill_inflight_dc: Arc::new(Mutex::new(HashSet::new())),
conn_count: AtomicUsize::new(0), conn_count: AtomicUsize::new(0),
@ -585,25 +614,40 @@ impl MePool {
me_reader_route_data_wait_ms: Arc::new(AtomicU64::new(me_reader_route_data_wait_ms)), me_reader_route_data_wait_ms: Arc::new(AtomicU64::new(me_reader_route_data_wait_ms)),
me_route_no_writer_mode: AtomicU8::new(me_route_no_writer_mode.as_u8()), me_route_no_writer_mode: AtomicU8::new(me_route_no_writer_mode.as_u8()),
me_route_no_writer_wait: Duration::from_millis(me_route_no_writer_wait_ms), me_route_no_writer_wait: Duration::from_millis(me_route_no_writer_wait_ms),
me_route_hybrid_max_wait: Duration::from_millis(me_route_hybrid_max_wait_ms.max(50)),
me_route_blocking_send_timeout: if me_route_blocking_send_timeout_ms == 0 {
None
} else {
Some(Duration::from_millis(
me_route_blocking_send_timeout_ms.min(5_000),
))
},
me_route_last_success_epoch_ms: AtomicU64::new(0),
me_route_hybrid_timeout_warn_epoch_ms: AtomicU64::new(0),
me_async_recovery_last_trigger_epoch_ms: AtomicU64::new(0),
me_route_inline_recovery_attempts, me_route_inline_recovery_attempts,
me_route_inline_recovery_wait: Duration::from_millis(me_route_inline_recovery_wait_ms), me_route_inline_recovery_wait: Duration::from_millis(me_route_inline_recovery_wait_ms),
me_health_interval_ms_unhealthy: AtomicU64::new(me_health_interval_ms_unhealthy.max(1)), me_health_interval_ms_unhealthy: AtomicU64::new(me_health_interval_ms_unhealthy.max(1)),
me_health_interval_ms_healthy: AtomicU64::new(me_health_interval_ms_healthy.max(1)), me_health_interval_ms_healthy: AtomicU64::new(me_health_interval_ms_healthy.max(1)),
me_warn_rate_limit_ms: AtomicU64::new(me_warn_rate_limit_ms.max(1)), me_warn_rate_limit_ms: AtomicU64::new(me_warn_rate_limit_ms.max(1)),
me_family_v4_runtime_state: AtomicU8::new(MeFamilyRuntimeState::Healthy as u8), family_health_v4: ArcSwap::from_pointee(FamilyHealthSnapshot::new(
me_family_v6_runtime_state: AtomicU8::new(MeFamilyRuntimeState::Healthy as u8), MeFamilyRuntimeState::Healthy,
me_family_v4_state_since_epoch_secs: AtomicU64::new(Self::now_epoch_secs()), now_epoch_secs,
me_family_v6_state_since_epoch_secs: AtomicU64::new(Self::now_epoch_secs()), 0,
me_family_v4_suppressed_until_epoch_secs: AtomicU64::new(0), 0,
me_family_v6_suppressed_until_epoch_secs: AtomicU64::new(0), 0,
me_family_v4_fail_streak: AtomicU32::new(0), )),
me_family_v6_fail_streak: AtomicU32::new(0), family_health_v6: ArcSwap::from_pointee(FamilyHealthSnapshot::new(
me_family_v4_recover_success_streak: AtomicU32::new(0), MeFamilyRuntimeState::Healthy,
me_family_v6_recover_success_streak: AtomicU32::new(0), now_epoch_secs,
0,
0,
0,
)),
me_last_drain_gate_route_quorum_ok: AtomicBool::new(false), me_last_drain_gate_route_quorum_ok: AtomicBool::new(false),
me_last_drain_gate_redundancy_ok: AtomicBool::new(false), me_last_drain_gate_redundancy_ok: AtomicBool::new(false),
me_last_drain_gate_block_reason: AtomicU8::new(MeDrainGateReason::Open as u8), me_last_drain_gate_block_reason: AtomicU8::new(MeDrainGateReason::Open as u8),
me_last_drain_gate_updated_at_epoch_secs: AtomicU64::new(Self::now_epoch_secs()), me_last_drain_gate_updated_at_epoch_secs: AtomicU64::new(now_epoch_secs),
runtime_ready: AtomicBool::new(false), runtime_ready: AtomicBool::new(false),
preferred_endpoints_by_dc: Arc::new(RwLock::new(preferred_endpoints_by_dc)), preferred_endpoints_by_dc: Arc::new(RwLock::new(preferred_endpoints_by_dc)),
}) })
@ -621,6 +665,19 @@ impl MePool {
self.runtime_ready.load(Ordering::Relaxed) self.runtime_ready.load(Ordering::Relaxed)
} }
pub(super) fn now_epoch_millis() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}
pub(super) fn notify_writer_epoch(&self) {
let _ = self.writer_epoch.send_modify(|epoch| {
*epoch = epoch.wrapping_add(1);
});
}
#[allow(dead_code)] #[allow(dead_code)]
pub(super) fn set_family_runtime_state( pub(super) fn set_family_runtime_state(
&self, &self,
@ -631,82 +688,51 @@ impl MePool {
fail_streak: u32, fail_streak: u32,
recover_success_streak: u32, recover_success_streak: u32,
) { ) {
let snapshot = Arc::new(FamilyHealthSnapshot::new(
state,
state_since_epoch_secs,
suppressed_until_epoch_secs,
fail_streak,
recover_success_streak,
));
match family { match family {
IpFamily::V4 => { IpFamily::V4 => self.family_health_v4.store(snapshot),
self.me_family_v4_runtime_state IpFamily::V6 => self.family_health_v6.store(snapshot),
.store(state as u8, Ordering::Relaxed);
self.me_family_v4_state_since_epoch_secs
.store(state_since_epoch_secs, Ordering::Relaxed);
self.me_family_v4_suppressed_until_epoch_secs
.store(suppressed_until_epoch_secs, Ordering::Relaxed);
self.me_family_v4_fail_streak
.store(fail_streak, Ordering::Relaxed);
self.me_family_v4_recover_success_streak
.store(recover_success_streak, Ordering::Relaxed);
}
IpFamily::V6 => {
self.me_family_v6_runtime_state
.store(state as u8, Ordering::Relaxed);
self.me_family_v6_state_since_epoch_secs
.store(state_since_epoch_secs, Ordering::Relaxed);
self.me_family_v6_suppressed_until_epoch_secs
.store(suppressed_until_epoch_secs, Ordering::Relaxed);
self.me_family_v6_fail_streak
.store(fail_streak, Ordering::Relaxed);
self.me_family_v6_recover_success_streak
.store(recover_success_streak, Ordering::Relaxed);
}
} }
} }
pub(crate) fn family_runtime_state(&self, family: IpFamily) -> MeFamilyRuntimeState { pub(crate) fn family_runtime_state(&self, family: IpFamily) -> MeFamilyRuntimeState {
match family { match family {
IpFamily::V4 => MeFamilyRuntimeState::from_u8( IpFamily::V4 => self.family_health_v4.load().state,
self.me_family_v4_runtime_state.load(Ordering::Relaxed), IpFamily::V6 => self.family_health_v6.load().state,
),
IpFamily::V6 => MeFamilyRuntimeState::from_u8(
self.me_family_v6_runtime_state.load(Ordering::Relaxed),
),
} }
} }
pub(crate) fn family_runtime_state_since_epoch_secs(&self, family: IpFamily) -> u64 { pub(crate) fn family_runtime_state_since_epoch_secs(&self, family: IpFamily) -> u64 {
match family { match family {
IpFamily::V4 => self IpFamily::V4 => self.family_health_v4.load().state_since_epoch_secs,
.me_family_v4_state_since_epoch_secs IpFamily::V6 => self.family_health_v6.load().state_since_epoch_secs,
.load(Ordering::Relaxed),
IpFamily::V6 => self
.me_family_v6_state_since_epoch_secs
.load(Ordering::Relaxed),
} }
} }
pub(crate) fn family_suppressed_until_epoch_secs(&self, family: IpFamily) -> u64 { pub(crate) fn family_suppressed_until_epoch_secs(&self, family: IpFamily) -> u64 {
match family { match family {
IpFamily::V4 => self IpFamily::V4 => self.family_health_v4.load().suppressed_until_epoch_secs,
.me_family_v4_suppressed_until_epoch_secs IpFamily::V6 => self.family_health_v6.load().suppressed_until_epoch_secs,
.load(Ordering::Relaxed),
IpFamily::V6 => self
.me_family_v6_suppressed_until_epoch_secs
.load(Ordering::Relaxed),
} }
} }
pub(crate) fn family_fail_streak(&self, family: IpFamily) -> u32 { pub(crate) fn family_fail_streak(&self, family: IpFamily) -> u32 {
match family { match family {
IpFamily::V4 => self.me_family_v4_fail_streak.load(Ordering::Relaxed), IpFamily::V4 => self.family_health_v4.load().fail_streak,
IpFamily::V6 => self.me_family_v6_fail_streak.load(Ordering::Relaxed), IpFamily::V6 => self.family_health_v6.load().fail_streak,
} }
} }
pub(crate) fn family_recover_success_streak(&self, family: IpFamily) -> u32 { pub(crate) fn family_recover_success_streak(&self, family: IpFamily) -> u32 {
match family { match family {
IpFamily::V4 => self IpFamily::V4 => self.family_health_v4.load().recover_success_streak,
.me_family_v4_recover_success_streak IpFamily::V6 => self.family_health_v6.load().recover_success_streak,
.load(Ordering::Relaxed),
IpFamily::V6 => self
.me_family_v6_recover_success_streak
.load(Ordering::Relaxed),
} }
} }
@ -818,6 +844,9 @@ impl MePool {
self.me_instadrain.store(instadrain, Ordering::Relaxed); self.me_instadrain.store(instadrain, Ordering::Relaxed);
self.me_pool_drain_threshold self.me_pool_drain_threshold
.store(pool_drain_threshold, Ordering::Relaxed); .store(pool_drain_threshold, Ordering::Relaxed);
// Runtime soft-evict knobs are updated lock-free to keep control-plane
// writes non-blocking; readers observe a short eventual-consistency
// window by design.
self.me_pool_drain_soft_evict_enabled self.me_pool_drain_soft_evict_enabled
.store(pool_drain_soft_evict_enabled, Ordering::Relaxed); .store(pool_drain_soft_evict_enabled, Ordering::Relaxed);
self.me_pool_drain_soft_evict_grace_secs self.me_pool_drain_soft_evict_grace_secs
@ -1574,6 +1603,22 @@ impl MePool {
let preferred = Self::build_preferred_endpoints_by_dc(&self.decision, &map_v4, &map_v6); let preferred = Self::build_preferred_endpoints_by_dc(&self.decision, &map_v4, &map_v6);
*self.endpoint_dc_map.write().await = rebuilt; *self.endpoint_dc_map.write().await = rebuilt;
*self.preferred_endpoints_by_dc.write().await = preferred; *self.preferred_endpoints_by_dc.write().await = preferred;
let configured_endpoints = self
.endpoint_dc_map
.read()
.await
.keys()
.copied()
.collect::<HashSet<SocketAddr>>();
{
let mut quarantine = self.endpoint_quarantine.lock().await;
let now = Instant::now();
quarantine.retain(|addr, expiry| *expiry > now && configured_endpoints.contains(addr));
}
{
let mut kdf_fp = self.kdf_material_fingerprint.write().await;
kdf_fp.retain(|addr, _| configured_endpoints.contains(addr));
}
} }
pub(super) async fn preferred_endpoints_for_dc(&self, dc: i32) -> Vec<SocketAddr> { pub(super) async fn preferred_endpoints_for_dc(&self, dc: i32) -> Vec<SocketAddr> {

View File

@ -72,7 +72,7 @@ impl MePool {
} }
if changed { if changed {
self.rebuild_endpoint_dc_map().await; self.rebuild_endpoint_dc_map().await;
self.writer_available.notify_waiters(); self.notify_writer_epoch();
} }
if changed { if changed {
SnapshotApplyOutcome::AppliedChanged SnapshotApplyOutcome::AppliedChanged

View File

@ -13,8 +13,22 @@ use super::pool::{MePool, RefillDcKey, RefillEndpointKey, WriterContour};
const ME_FLAP_UPTIME_THRESHOLD_SECS: u64 = 20; const ME_FLAP_UPTIME_THRESHOLD_SECS: u64 = 20;
const ME_FLAP_QUARANTINE_SECS: u64 = 25; const ME_FLAP_QUARANTINE_SECS: u64 = 25;
const ME_REFILL_TOTAL_ATTEMPT_CAP: u32 = 20;
impl MePool { impl MePool {
pub(super) async fn sweep_endpoint_quarantine(&self) {
let configured = self
.endpoint_dc_map
.read()
.await
.keys()
.copied()
.collect::<HashSet<SocketAddr>>();
let now = Instant::now();
let mut guard = self.endpoint_quarantine.lock().await;
guard.retain(|addr, expiry| *expiry > now && configured.contains(addr));
}
pub(super) async fn maybe_quarantine_flapping_endpoint( pub(super) async fn maybe_quarantine_flapping_endpoint(
&self, &self,
addr: SocketAddr, addr: SocketAddr,
@ -206,10 +220,15 @@ impl MePool {
async fn refill_writer_after_loss(self: &Arc<Self>, addr: SocketAddr, writer_dc: i32) -> bool { async fn refill_writer_after_loss(self: &Arc<Self>, addr: SocketAddr, writer_dc: i32) -> bool {
let fast_retries = self.me_reconnect_fast_retry_count.max(1); let fast_retries = self.me_reconnect_fast_retry_count.max(1);
let mut total_attempts = 0u32;
let same_endpoint_quarantined = self.is_endpoint_quarantined(addr).await; let same_endpoint_quarantined = self.is_endpoint_quarantined(addr).await;
if !same_endpoint_quarantined { if !same_endpoint_quarantined {
for attempt in 0..fast_retries { for attempt in 0..fast_retries {
if total_attempts >= ME_REFILL_TOTAL_ATTEMPT_CAP {
break;
}
total_attempts = total_attempts.saturating_add(1);
self.stats.increment_me_reconnect_attempt(); self.stats.increment_me_reconnect_attempt();
match self match self
.connect_one_for_dc(addr, writer_dc, self.rng.as_ref()) .connect_one_for_dc(addr, writer_dc, self.rng.as_ref())
@ -250,6 +269,10 @@ impl MePool {
} }
for attempt in 0..fast_retries { for attempt in 0..fast_retries {
if total_attempts >= ME_REFILL_TOTAL_ATTEMPT_CAP {
break;
}
total_attempts = total_attempts.saturating_add(1);
self.stats.increment_me_reconnect_attempt(); self.stats.increment_me_reconnect_attempt();
if self if self
.connect_endpoints_round_robin(writer_dc, &dc_endpoints, self.rng.as_ref()) .connect_endpoints_round_robin(writer_dc, &dc_endpoints, self.rng.as_ref())

View File

@ -1,5 +1,6 @@
use std::io::ErrorKind; use std::io::ErrorKind;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, Ordering}; use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, Ordering};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@ -25,6 +26,7 @@ const ME_ACTIVE_PING_SECS: u64 = 25;
const ME_ACTIVE_PING_JITTER_SECS: i64 = 5; const ME_ACTIVE_PING_JITTER_SECS: i64 = 5;
const ME_IDLE_KEEPALIVE_MAX_SECS: u64 = 5; const ME_IDLE_KEEPALIVE_MAX_SECS: u64 = 5;
const ME_RPC_PROXY_REQ_RESPONSE_WAIT_MS: u64 = 700; const ME_RPC_PROXY_REQ_RESPONSE_WAIT_MS: u64 = 700;
const ME_PING_TRACKER_CLEANUP_EVERY: u32 = 32;
#[derive(Clone, Copy)] #[derive(Clone, Copy)]
enum WriterTeardownMode { enum WriterTeardownMode {
@ -197,11 +199,11 @@ impl MePool {
self.registry.register_writer(writer_id, tx.clone()).await; self.registry.register_writer(writer_id, tx.clone()).await;
self.registry.mark_writer_idle(writer_id).await; self.registry.mark_writer_idle(writer_id).await;
self.conn_count.fetch_add(1, Ordering::Relaxed); self.conn_count.fetch_add(1, Ordering::Relaxed);
self.writer_available.notify_one(); self.notify_writer_epoch();
let reg = self.registry.clone(); let reg = self.registry.clone();
let writers_arc = self.writers_arc(); let writers_arc = self.writers_arc();
let ping_tracker = self.ping_tracker.clone(); let ping_tracker = Arc::new(tokio::sync::Mutex::new(HashMap::<i64, Instant>::new()));
let ping_tracker_reader = ping_tracker.clone(); let ping_tracker_reader = ping_tracker.clone();
let rtt_stats = self.rtt_stats.clone(); let rtt_stats = self.rtt_stats.clone();
let stats_reader = self.stats.clone(); let stats_reader = self.stats.clone();
@ -280,6 +282,7 @@ impl MePool {
let pool_ping = Arc::downgrade(self); let pool_ping = Arc::downgrade(self);
tokio::spawn(async move { tokio::spawn(async move {
let mut ping_id: i64 = rand::random::<i64>(); let mut ping_id: i64 = rand::random::<i64>();
let mut cleanup_tick: u32 = 0;
let idle_interval_cap = Duration::from_secs(ME_IDLE_KEEPALIVE_MAX_SECS); let idle_interval_cap = Duration::from_secs(ME_IDLE_KEEPALIVE_MAX_SECS);
// Per-writer jittered start to avoid phase sync. // Per-writer jittered start to avoid phase sync.
let startup_jitter = if keepalive_enabled { let startup_jitter = if keepalive_enabled {
@ -339,39 +342,16 @@ impl MePool {
p.extend_from_slice(&sent_id.to_le_bytes()); p.extend_from_slice(&sent_id.to_le_bytes());
{ {
let mut tracker = ping_tracker_ping.lock().await; let mut tracker = ping_tracker_ping.lock().await;
let now_epoch_ms = std::time::SystemTime::now() cleanup_tick = cleanup_tick.wrapping_add(1);
.duration_since(std::time::UNIX_EPOCH) if cleanup_tick.is_multiple_of(ME_PING_TRACKER_CLEANUP_EVERY) {
.unwrap_or_default()
.as_millis() as u64;
let mut run_cleanup = false;
if let Some(pool) = pool_ping.upgrade() {
let last_cleanup_ms = pool
.ping_tracker_last_cleanup_epoch_ms
.load(Ordering::Relaxed);
if now_epoch_ms.saturating_sub(last_cleanup_ms) >= 30_000
&& pool
.ping_tracker_last_cleanup_epoch_ms
.compare_exchange(
last_cleanup_ms,
now_epoch_ms,
Ordering::AcqRel,
Ordering::Relaxed,
)
.is_ok()
{
run_cleanup = true;
}
}
if run_cleanup {
let before = tracker.len(); let before = tracker.len();
tracker.retain(|_, (ts, _)| ts.elapsed() < Duration::from_secs(120)); tracker.retain(|_, ts| ts.elapsed() < Duration::from_secs(120));
let expired = before.saturating_sub(tracker.len()); let expired = before.saturating_sub(tracker.len());
if expired > 0 { if expired > 0 {
stats_ping.increment_me_keepalive_timeout_by(expired as u64); stats_ping.increment_me_keepalive_timeout_by(expired as u64);
} }
} }
tracker.insert(sent_id, (std::time::Instant::now(), writer_id)); tracker.insert(sent_id, std::time::Instant::now());
} }
ping_id = ping_id.wrapping_add(1); ping_id = ping_id.wrapping_add(1);
stats_ping.increment_me_keepalive_sent(); stats_ping.increment_me_keepalive_sent();
@ -594,10 +574,6 @@ impl MePool {
// The close command below is only a best-effort accelerator for task shutdown. // The close command below is only a best-effort accelerator for task shutdown.
// Cleanup progress must never depend on command-channel availability. // Cleanup progress must never depend on command-channel availability.
let _ = self.registry.writer_lost(writer_id).await; let _ = self.registry.writer_lost(writer_id).await;
{
let mut tracker = self.ping_tracker.lock().await;
tracker.retain(|_, (_, wid)| *wid != writer_id);
}
self.rtt_stats.lock().await.remove(&writer_id); self.rtt_stats.lock().await.remove(&writer_id);
if let Some(tx) = close_tx { if let Some(tx) = close_tx {
let _ = tx.send(WriterCommand::Close).await; let _ = tx.send(WriterCommand::Close).await;
@ -611,6 +587,9 @@ impl MePool {
self.trigger_immediate_refill_for_dc(addr, writer_dc); self.trigger_immediate_refill_for_dc(addr, writer_dc);
} }
} }
if removed {
self.notify_writer_epoch();
}
removed removed
} }

View File

@ -32,10 +32,10 @@ pub(crate) async fn reader_loop(
enc_leftover: BytesMut, enc_leftover: BytesMut,
mut dec: BytesMut, mut dec: BytesMut,
tx: mpsc::Sender<WriterCommand>, tx: mpsc::Sender<WriterCommand>,
ping_tracker: Arc<Mutex<HashMap<i64, (Instant, u64)>>>, ping_tracker: Arc<Mutex<HashMap<i64, Instant>>>,
rtt_stats: Arc<Mutex<HashMap<u64, (f64, f64)>>>, rtt_stats: Arc<Mutex<HashMap<u64, (f64, f64)>>>,
stats: Arc<Stats>, stats: Arc<Stats>,
_writer_id: u64, writer_id: u64,
degraded: Arc<AtomicBool>, degraded: Arc<AtomicBool>,
writer_rtt_ema_ms_x10: Arc<AtomicU32>, writer_rtt_ema_ms_x10: Arc<AtomicU32>,
reader_route_data_wait_ms: Arc<AtomicU64>, reader_route_data_wait_ms: Arc<AtomicU64>,
@ -45,7 +45,7 @@ pub(crate) async fn reader_loop(
let mut expected_seq: i32 = 0; let mut expected_seq: i32 = 0;
loop { loop {
let mut tmp = [0u8; 16_384]; let mut tmp = [0u8; 65_536];
let n = tokio::select! { let n = tokio::select! {
res = rd.read(&mut tmp) => res.map_err(ProxyError::Io)?, res = rd.read(&mut tmp) => res.map_err(ProxyError::Io)?,
_ = cancel.cancelled() => return Ok(()), _ = cancel.cancelled() => return Ok(()),
@ -203,13 +203,13 @@ pub(crate) async fn reader_loop(
} else if pt == RPC_PONG_U32 && body.len() >= 8 { } else if pt == RPC_PONG_U32 && body.len() >= 8 {
let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap()); let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap());
stats.increment_me_keepalive_pong(); stats.increment_me_keepalive_pong();
if let Some((sent, wid)) = { if let Some(sent) = {
let mut guard = ping_tracker.lock().await; let mut guard = ping_tracker.lock().await;
guard.remove(&ping_id) guard.remove(&ping_id)
} { } {
let rtt = sent.elapsed().as_secs_f64() * 1000.0; let rtt = sent.elapsed().as_secs_f64() * 1000.0;
let mut stats = rtt_stats.lock().await; let mut stats = rtt_stats.lock().await;
let entry = stats.entry(wid).or_insert((rtt, rtt)); let entry = stats.entry(writer_id).or_insert((rtt, rtt));
entry.1 = entry.1 * 0.8 + rtt * 0.2; entry.1 = entry.1 * 0.8 + rtt * 0.2;
if rtt < entry.0 { if rtt < entry.0 {
entry.0 = rtt; entry.0 = rtt;
@ -224,7 +224,7 @@ pub(crate) async fn reader_loop(
Ordering::Relaxed, Ordering::Relaxed,
); );
trace!( trace!(
writer_id = wid, writer_id,
rtt_ms = rtt, rtt_ms = rtt,
ema_ms = entry.1, ema_ms = entry.1,
base_ms = entry.0, base_ms = entry.0,

View File

@ -292,6 +292,12 @@ impl ConnRegistry {
pub async fn bind_writer(&self, conn_id: u64, writer_id: u64, meta: ConnMeta) -> bool { pub async fn bind_writer(&self, conn_id: u64, writer_id: u64, meta: ConnMeta) -> bool {
let mut inner = self.inner.write().await; let mut inner = self.inner.write().await;
// ROUTING IS THE SOURCE OF TRUTH:
// never keep/attach writer binding for a connection that is already
// absent from the routing table.
if !inner.map.contains_key(&conn_id) {
return false;
}
if !inner.writers.contains_key(&writer_id) { if !inner.writers.contains_key(&writer_id) {
return false; return false;
} }
@ -382,9 +388,39 @@ impl ConnRegistry {
} }
pub async fn get_writer(&self, conn_id: u64) -> Option<ConnWriter> { pub async fn get_writer(&self, conn_id: u64) -> Option<ConnWriter> {
let inner = self.inner.read().await; let mut inner = self.inner.write().await;
let writer_id = inner.writer_for_conn.get(&conn_id).cloned()?; // ROUTING IS THE SOURCE OF TRUTH:
let writer = inner.writers.get(&writer_id).cloned()?; // stale bindings are ignored and lazily cleaned when routing no longer
// contains the connection.
if !inner.map.contains_key(&conn_id) {
inner.meta.remove(&conn_id);
if let Some(stale_writer_id) = inner.writer_for_conn.remove(&conn_id)
&& let Some(conns) = inner.conns_for_writer.get_mut(&stale_writer_id)
{
conns.remove(&conn_id);
if conns.is_empty() {
inner
.writer_idle_since_epoch_secs
.insert(stale_writer_id, Self::now_epoch_secs());
}
}
return None;
}
let writer_id = inner.writer_for_conn.get(&conn_id).copied()?;
let Some(writer) = inner.writers.get(&writer_id).cloned() else {
inner.writer_for_conn.remove(&conn_id);
inner.meta.remove(&conn_id);
if let Some(conns) = inner.conns_for_writer.get_mut(&writer_id) {
conns.remove(&conn_id);
if conns.is_empty() {
inner
.writer_idle_since_epoch_secs
.insert(writer_id, Self::now_epoch_secs());
}
}
return None;
};
Some(ConnWriter { Some(ConnWriter {
writer_id, writer_id,
tx: writer, tx: writer,

View File

@ -26,6 +26,9 @@ use rand::seq::SliceRandom;
const IDLE_WRITER_PENALTY_MID_SECS: u64 = 45; const IDLE_WRITER_PENALTY_MID_SECS: u64 = 45;
const IDLE_WRITER_PENALTY_HIGH_SECS: u64 = 55; const IDLE_WRITER_PENALTY_HIGH_SECS: u64 = 55;
const HYBRID_GLOBAL_BURST_PERIOD_ROUNDS: u32 = 4; const HYBRID_GLOBAL_BURST_PERIOD_ROUNDS: u32 = 4;
const HYBRID_RECENT_SUCCESS_WINDOW_MS: u64 = 120_000;
const HYBRID_TIMEOUT_WARN_RATE_LIMIT_MS: u64 = 5_000;
const HYBRID_RECOVERY_TRIGGER_MIN_INTERVAL_MS: u64 = 5_000;
const PICK_PENALTY_WARM: u64 = 200; const PICK_PENALTY_WARM: u64 = 200;
const PICK_PENALTY_DRAINING: u64 = 600; const PICK_PENALTY_DRAINING: u64 = 600;
const PICK_PENALTY_STALE: u64 = 300; const PICK_PENALTY_STALE: u64 = 300;
@ -77,6 +80,7 @@ impl MePool {
let mut async_recovery_triggered = false; let mut async_recovery_triggered = false;
let mut hybrid_recovery_round = 0u32; let mut hybrid_recovery_round = 0u32;
let mut hybrid_last_recovery_at: Option<Instant> = None; let mut hybrid_last_recovery_at: Option<Instant> = None;
let mut hybrid_total_deadline: Option<Instant> = None;
let hybrid_wait_step = self.me_route_no_writer_wait.max(Duration::from_millis(50)); let hybrid_wait_step = self.me_route_no_writer_wait.max(Duration::from_millis(50));
let mut hybrid_wait_current = hybrid_wait_step; let mut hybrid_wait_current = hybrid_wait_step;
@ -92,9 +96,13 @@ impl MePool {
.tx .tx
.try_send(WriterCommand::Data(current_payload.clone())) .try_send(WriterCommand::Data(current_payload.clone()))
{ {
Ok(()) => return Ok(()), Ok(()) => {
self.note_hybrid_route_success();
return Ok(());
}
Err(TrySendError::Full(cmd)) => { Err(TrySendError::Full(cmd)) => {
if current.tx.send(cmd).await.is_ok() { if current.tx.send(cmd).await.is_ok() {
self.note_hybrid_route_success();
return Ok(()); return Ok(());
} }
warn!(writer_id = current.writer_id, "ME writer channel closed"); warn!(writer_id = current.writer_id, "ME writer channel closed");
@ -182,6 +190,15 @@ impl MePool {
continue; continue;
} }
MeRouteNoWriterMode::HybridAsyncPersistent => { MeRouteNoWriterMode::HybridAsyncPersistent => {
let total_deadline = *hybrid_total_deadline.get_or_insert_with(|| {
Instant::now() + self.hybrid_total_wait_budget()
});
if Instant::now() >= total_deadline {
self.on_hybrid_timeout(total_deadline, routed_dc);
return Err(ProxyError::Proxy(
"ME writer not available within hybrid timeout".into(),
));
}
if !unknown_target_dc { if !unknown_target_dc {
self.maybe_trigger_hybrid_recovery( self.maybe_trigger_hybrid_recovery(
routed_dc, routed_dc,
@ -292,6 +309,15 @@ impl MePool {
} }
} }
MeRouteNoWriterMode::HybridAsyncPersistent => { MeRouteNoWriterMode::HybridAsyncPersistent => {
let total_deadline = *hybrid_total_deadline
.get_or_insert_with(|| Instant::now() + self.hybrid_total_wait_budget());
if Instant::now() >= total_deadline {
self.on_hybrid_timeout(total_deadline, routed_dc);
return Err(ProxyError::Proxy(
"No ME writers available for target DC within hybrid timeout"
.into(),
));
}
if !unknown_target_dc { if !unknown_target_dc {
self.maybe_trigger_hybrid_recovery( self.maybe_trigger_hybrid_recovery(
routed_dc, routed_dc,
@ -423,6 +449,7 @@ impl MePool {
"Selected stale ME writer for fallback bind" "Selected stale ME writer for fallback bind"
); );
} }
self.note_hybrid_route_success();
return Ok(()); return Ok(());
} }
Err(TrySendError::Full(_)) => { Err(TrySendError::Full(_)) => {
@ -453,7 +480,18 @@ impl MePool {
.increment_me_writer_pick_blocking_fallback_total(); .increment_me_writer_pick_blocking_fallback_total();
let effective_our_addr = SocketAddr::new(w.source_ip, our_addr.port()); let effective_our_addr = SocketAddr::new(w.source_ip, our_addr.port());
let (payload, meta) = build_routed_payload(effective_our_addr); let (payload, meta) = build_routed_payload(effective_our_addr);
match w.tx.clone().reserve_owned().await { let reserve_result = if let Some(timeout) = self.me_route_blocking_send_timeout {
match tokio::time::timeout(timeout, w.tx.clone().reserve_owned()).await {
Ok(result) => result,
Err(_) => {
self.stats.increment_me_writer_pick_full_total(pick_mode);
continue;
}
}
} else {
w.tx.clone().reserve_owned().await
};
match reserve_result {
Ok(permit) => { Ok(permit) => {
if !self.registry.bind_writer(conn_id, w.id, meta).await { if !self.registry.bind_writer(conn_id, w.id, meta).await {
debug!( debug!(
@ -471,6 +509,7 @@ impl MePool {
if w.generation < self.current_generation() { if w.generation < self.current_generation() {
self.stats.increment_pool_stale_pick_total(); self.stats.increment_pool_stale_pick_total();
} }
self.note_hybrid_route_success();
return Ok(()); return Ok(());
} }
Err(_) => { Err(_) => {
@ -483,7 +522,7 @@ impl MePool {
} }
async fn wait_for_writer_until(&self, deadline: Instant) -> bool { async fn wait_for_writer_until(&self, deadline: Instant) -> bool {
let waiter = self.writer_available.notified(); let mut rx = self.writer_epoch.subscribe();
if !self.writers.read().await.is_empty() { if !self.writers.read().await.is_empty() {
return true; return true;
} }
@ -492,13 +531,14 @@ impl MePool {
return !self.writers.read().await.is_empty(); return !self.writers.read().await.is_empty();
} }
let timeout = deadline.saturating_duration_since(now); let timeout = deadline.saturating_duration_since(now);
if tokio::time::timeout(timeout, waiter).await.is_ok() { if tokio::time::timeout(timeout, rx.changed()).await.is_ok() {
return true; return !self.writers.read().await.is_empty();
} }
!self.writers.read().await.is_empty() !self.writers.read().await.is_empty()
} }
async fn wait_for_candidate_until(&self, routed_dc: i32, deadline: Instant) -> bool { async fn wait_for_candidate_until(&self, routed_dc: i32, deadline: Instant) -> bool {
let mut rx = self.writer_epoch.subscribe();
loop { loop {
if self.has_candidate_for_target_dc(routed_dc).await { if self.has_candidate_for_target_dc(routed_dc).await {
return true; return true;
@ -509,7 +549,6 @@ impl MePool {
return self.has_candidate_for_target_dc(routed_dc).await; return self.has_candidate_for_target_dc(routed_dc).await;
} }
let waiter = self.writer_available.notified();
if self.has_candidate_for_target_dc(routed_dc).await { if self.has_candidate_for_target_dc(routed_dc).await {
return true; return true;
} }
@ -517,7 +556,7 @@ impl MePool {
if remaining.is_zero() { if remaining.is_zero() {
return self.has_candidate_for_target_dc(routed_dc).await; return self.has_candidate_for_target_dc(routed_dc).await;
} }
if tokio::time::timeout(remaining, waiter).await.is_err() { if tokio::time::timeout(remaining, rx.changed()).await.is_err() {
return self.has_candidate_for_target_dc(routed_dc).await; return self.has_candidate_for_target_dc(routed_dc).await;
} }
} }
@ -587,6 +626,10 @@ impl MePool {
hybrid_last_recovery_at: &mut Option<Instant>, hybrid_last_recovery_at: &mut Option<Instant>,
hybrid_wait_step: Duration, hybrid_wait_step: Duration,
) { ) {
if !self.try_consume_hybrid_recovery_trigger_slot(HYBRID_RECOVERY_TRIGGER_MIN_INTERVAL_MS)
{
return;
}
if let Some(last) = *hybrid_last_recovery_at if let Some(last) = *hybrid_last_recovery_at
&& last.elapsed() < hybrid_wait_step && last.elapsed() < hybrid_wait_step
{ {
@ -602,6 +645,71 @@ impl MePool {
*hybrid_last_recovery_at = Some(Instant::now()); *hybrid_last_recovery_at = Some(Instant::now());
} }
fn hybrid_total_wait_budget(&self) -> Duration {
let base = self.me_route_hybrid_max_wait.max(Duration::from_millis(50));
let now_ms = Self::now_epoch_millis();
let last_success_ms = self.me_route_last_success_epoch_ms.load(Ordering::Relaxed);
if last_success_ms != 0
&& now_ms.saturating_sub(last_success_ms) <= HYBRID_RECENT_SUCCESS_WINDOW_MS
{
return base.saturating_mul(2);
}
base
}
fn note_hybrid_route_success(&self) {
self.me_route_last_success_epoch_ms
.store(Self::now_epoch_millis(), Ordering::Relaxed);
}
fn on_hybrid_timeout(&self, deadline: Instant, routed_dc: i32) {
self.stats.increment_me_hybrid_timeout_total();
let now_ms = Self::now_epoch_millis();
let mut last_warn_ms = self
.me_route_hybrid_timeout_warn_epoch_ms
.load(Ordering::Relaxed);
while now_ms.saturating_sub(last_warn_ms) >= HYBRID_TIMEOUT_WARN_RATE_LIMIT_MS {
match self.me_route_hybrid_timeout_warn_epoch_ms.compare_exchange_weak(
last_warn_ms,
now_ms,
Ordering::AcqRel,
Ordering::Relaxed,
) {
Ok(_) => {
warn!(
routed_dc,
budget_ms = self.hybrid_total_wait_budget().as_millis() as u64,
elapsed_ms = deadline.elapsed().as_millis() as u64,
"ME hybrid route timeout reached"
);
break;
}
Err(actual) => last_warn_ms = actual,
}
}
}
fn try_consume_hybrid_recovery_trigger_slot(&self, min_interval_ms: u64) -> bool {
let now_ms = Self::now_epoch_millis();
let mut last_trigger_ms = self
.me_async_recovery_last_trigger_epoch_ms
.load(Ordering::Relaxed);
loop {
if now_ms.saturating_sub(last_trigger_ms) < min_interval_ms {
return false;
}
match self.me_async_recovery_last_trigger_epoch_ms.compare_exchange_weak(
last_trigger_ms,
now_ms,
Ordering::AcqRel,
Ordering::Relaxed,
) {
Ok(_) => return true,
Err(actual) => last_trigger_ms = actual,
}
}
}
pub async fn send_close(self: &Arc<Self>, conn_id: u64) -> Result<()> { pub async fn send_close(self: &Arc<Self>, conn_id: u64) -> Result<()> {
if let Some(w) = self.registry.get_writer(conn_id).await { if let Some(w) = self.registry.get_writer(conn_id).await {
let mut p = Vec::with_capacity(12); let mut p = Vec::with_capacity(12);

View File

@ -113,6 +113,8 @@ async fn make_pool(
general.me_warn_rate_limit_ms, general.me_warn_rate_limit_ms,
MeRouteNoWriterMode::default(), MeRouteNoWriterMode::default(),
general.me_route_no_writer_wait_ms, general.me_route_no_writer_wait_ms,
general.me_route_hybrid_max_wait_ms,
general.me_route_blocking_send_timeout_ms,
general.me_route_inline_recovery_attempts, general.me_route_inline_recovery_attempts,
general.me_route_inline_recovery_wait_ms, general.me_route_inline_recovery_wait_ms,
); );

View File

@ -111,6 +111,8 @@ async fn make_pool(
general.me_warn_rate_limit_ms, general.me_warn_rate_limit_ms,
MeRouteNoWriterMode::default(), MeRouteNoWriterMode::default(),
general.me_route_no_writer_wait_ms, general.me_route_no_writer_wait_ms,
general.me_route_hybrid_max_wait_ms,
general.me_route_blocking_send_timeout_ms,
general.me_route_inline_recovery_attempts, general.me_route_inline_recovery_attempts,
general.me_route_inline_recovery_wait_ms, general.me_route_inline_recovery_wait_ms,
); );

View File

@ -106,6 +106,8 @@ async fn make_pool(me_pool_drain_threshold: u64) -> Arc<MePool> {
general.me_warn_rate_limit_ms, general.me_warn_rate_limit_ms,
MeRouteNoWriterMode::default(), MeRouteNoWriterMode::default(),
general.me_route_no_writer_wait_ms, general.me_route_no_writer_wait_ms,
general.me_route_hybrid_max_wait_ms,
general.me_route_blocking_send_timeout_ms,
general.me_route_inline_recovery_attempts, general.me_route_inline_recovery_attempts,
general.me_route_inline_recovery_wait_ms, general.me_route_inline_recovery_wait_ms,
) )

View File

@ -95,6 +95,8 @@ async fn make_pool() -> Arc<MePool> {
general.me_warn_rate_limit_ms, general.me_warn_rate_limit_ms,
MeRouteNoWriterMode::default(), MeRouteNoWriterMode::default(),
general.me_route_no_writer_wait_ms, general.me_route_no_writer_wait_ms,
general.me_route_hybrid_max_wait_ms,
general.me_route_blocking_send_timeout_ms,
general.me_route_inline_recovery_attempts, general.me_route_inline_recovery_attempts,
general.me_route_inline_recovery_wait_ms, general.me_route_inline_recovery_wait_ms,
) )

View File

@ -100,6 +100,8 @@ async fn make_pool() -> Arc<MePool> {
general.me_warn_rate_limit_ms, general.me_warn_rate_limit_ms,
MeRouteNoWriterMode::default(), MeRouteNoWriterMode::default(),
general.me_route_no_writer_wait_ms, general.me_route_no_writer_wait_ms,
general.me_route_hybrid_max_wait_ms,
general.me_route_blocking_send_timeout_ms,
general.me_route_inline_recovery_attempts, general.me_route_inline_recovery_attempts,
general.me_route_inline_recovery_wait_ms, general.me_route_inline_recovery_wait_ms,
) )

View File

@ -106,6 +106,8 @@ async fn make_pool() -> (Arc<MePool>, Arc<SecureRandom>) {
general.me_warn_rate_limit_ms, general.me_warn_rate_limit_ms,
general.me_route_no_writer_mode, general.me_route_no_writer_mode,
general.me_route_no_writer_wait_ms, general.me_route_no_writer_wait_ms,
general.me_route_hybrid_max_wait_ms,
general.me_route_blocking_send_timeout_ms,
general.me_route_inline_recovery_attempts, general.me_route_inline_recovery_attempts,
general.me_route_inline_recovery_wait_ms, general.me_route_inline_recovery_wait_ms,
); );