Floor Runtime + Writer Selection Policy + Reconnect/Warmup + TransportPolicy + NAT Runtime Cores

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey
2026-03-25 20:55:20 +03:00
parent 7ce5fc66db
commit ceae1564af
14 changed files with 502 additions and 296 deletions

View File

@@ -153,7 +153,8 @@ impl MePool {
MeRouteNoWriterMode::InlineRecoveryLegacy => {
self.stats.increment_me_inline_recovery_total();
if !unknown_target_dc {
for _ in 0..self.route_runtime.me_route_inline_recovery_attempts.max(1)
for _ in
0..self.route_runtime.me_route_inline_recovery_attempts.max(1)
{
for family in self.family_order() {
let map = match family {
@@ -319,8 +320,9 @@ impl MePool {
}
}
MeRouteNoWriterMode::HybridAsyncPersistent => {
let total_deadline = *hybrid_total_deadline
.get_or_insert_with(|| Instant::now() + self.hybrid_total_wait_budget());
let total_deadline = *hybrid_total_deadline.get_or_insert_with(|| {
Instant::now() + self.hybrid_total_wait_budget()
});
if Instant::now() >= total_deadline {
self.on_hybrid_timeout(total_deadline, routed_dc);
return Err(ProxyError::Proxy(
@@ -368,7 +370,11 @@ impl MePool {
pick_sample_size,
)
} else {
if self.me_deterministic_writer_sort.load(Ordering::Relaxed) {
if self
.writer_selection_policy
.me_deterministic_writer_sort
.load(Ordering::Relaxed)
{
candidate_indices.sort_by(|lhs, rhs| {
let left = &writers_snapshot[*lhs];
let right = &writers_snapshot[*rhs];
@@ -490,18 +496,18 @@ impl MePool {
.increment_me_writer_pick_blocking_fallback_total();
let effective_our_addr = SocketAddr::new(w.source_ip, our_addr.port());
let (payload, meta) = build_routed_payload(effective_our_addr);
let reserve_result = if let Some(timeout) = self.route_runtime.me_route_blocking_send_timeout
{
match tokio::time::timeout(timeout, w.tx.clone().reserve_owned()).await {
Ok(result) => result,
Err(_) => {
self.stats.increment_me_writer_pick_full_total(pick_mode);
continue;
let reserve_result =
if let Some(timeout) = self.route_runtime.me_route_blocking_send_timeout {
match tokio::time::timeout(timeout, w.tx.clone().reserve_owned()).await {
Ok(result) => result,
Err(_) => {
self.stats.increment_me_writer_pick_full_total(pick_mode);
continue;
}
}
}
} else {
w.tx.clone().reserve_owned().await
};
} else {
w.tx.clone().reserve_owned().await
};
match reserve_result {
Ok(permit) => {
if !self.registry.bind_writer(conn_id, w.id, meta).await {
@@ -637,8 +643,7 @@ impl MePool {
hybrid_last_recovery_at: &mut Option<Instant>,
hybrid_wait_step: Duration,
) {
if !self.try_consume_hybrid_recovery_trigger_slot(HYBRID_RECOVERY_TRIGGER_MIN_INTERVAL_MS)
{
if !self.try_consume_hybrid_recovery_trigger_slot(HYBRID_RECOVERY_TRIGGER_MIN_INTERVAL_MS) {
return;
}
if let Some(last) = *hybrid_last_recovery_at
@@ -691,12 +696,8 @@ impl MePool {
match self
.route_runtime
.me_route_hybrid_timeout_warn_epoch_ms
.compare_exchange_weak(
last_warn_ms,
now_ms,
Ordering::AcqRel,
Ordering::Relaxed,
) {
.compare_exchange_weak(last_warn_ms, now_ms, Ordering::AcqRel, Ordering::Relaxed)
{
Ok(_) => {
warn!(
routed_dc,
@@ -724,12 +725,8 @@ impl MePool {
match self
.route_runtime
.me_async_recovery_last_trigger_epoch_ms
.compare_exchange_weak(
last_trigger_ms,
now_ms,
Ordering::AcqRel,
Ordering::Relaxed,
) {
.compare_exchange_weak(last_trigger_ms, now_ms, Ordering::AcqRel, Ordering::Relaxed)
{
Ok(_) => return true,
Err(actual) => last_trigger_ms = actual,
}