diff --git a/docs/API.md b/docs/API.md
index a1f0f4f..886d159 100644
--- a/docs/API.md
+++ b/docs/API.md
@@ -919,7 +919,7 @@ Note: the request contract is defined, but the corresponding route currently ret
| `me_bind_stale_ttl_secs` | `u64` | Stale writer TTL. |
| `me_single_endpoint_shadow_writers` | `u8` | Shadow writers for single-endpoint DCs. |
| `me_single_endpoint_outage_mode_enabled` | `bool` | Outage mode toggle for single-endpoint DCs. |
-| `me_single_endpoint_outage_disable_quarantine` | `bool` | Quarantine behavior in outage mode. |
+| `me_single_endpoint_outage_disable_quarantine` | `bool` | Allows reconnect attempts to bypass endpoint quarantine for single-endpoint outage recovery paths. |
| `me_single_endpoint_outage_backoff_min_ms` | `u64` | Outage mode min reconnect backoff. |
| `me_single_endpoint_outage_backoff_max_ms` | `u64` | Outage mode max reconnect backoff. |
| `me_single_endpoint_shadow_rotate_every_secs` | `u64` | Shadow rotation interval. |
diff --git a/docs/CONFIG_PARAMS.en.md b/docs/CONFIG_PARAMS.en.md
index 1222e89..ed3796d 100644
--- a/docs/CONFIG_PARAMS.en.md
+++ b/docs/CONFIG_PARAMS.en.md
@@ -738,7 +738,7 @@ This document lists all configuration keys accepted by `config.toml`.
- `me_single_endpoint_outage_disable_quarantine`
- **Constraints / validation**: `bool`.
- - **Description**: Ignores endpoint quarantine while in single-endpoint outage mode.
+ - **Description**: Allows single-endpoint outage recovery reconnect paths to bypass endpoint quarantine.
- **Example**:
```toml
@@ -788,7 +788,7 @@ This document lists all configuration keys accepted by `config.toml`.
- `me_adaptive_floor_idle_secs`
- **Constraints / validation**: `u64` (seconds).
- - **Description**: Idle time before adaptive floor may reduce the single-endpoint writer target.
+ - **Description**: Reserved adaptive-floor timing knob exposed in runtime config and API snapshots for compatibility.
- **Example**:
```toml
@@ -818,7 +818,7 @@ This document lists all configuration keys accepted by `config.toml`.
- `me_adaptive_floor_recover_grace_secs`
- **Constraints / validation**: `u64` (seconds).
- - **Description**: Grace period to hold static floor after activity in adaptive mode.
+ - **Description**: Reserved adaptive-floor grace knob exposed in runtime config and API snapshots for compatibility.
- **Example**:
```toml
diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs
index 257d8f3..000bca0 100644
--- a/src/transport/middle_proxy/health.rs
+++ b/src/transport/middle_proxy/health.rs
@@ -67,10 +67,8 @@ struct FamilyReconnectOutcome {
key: (i32, IpFamily),
dc: i32,
family: IpFamily,
- alive: usize,
required: usize,
endpoint_count: usize,
- restored: usize,
}
pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_connections: usize) {
@@ -82,8 +80,6 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c
let mut single_endpoint_outage: HashSet<(i32, IpFamily)> = HashSet::new();
let mut shadow_rotate_deadline: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut idle_refresh_next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new();
- let mut adaptive_idle_since: HashMap<(i32, IpFamily), Instant> = HashMap::new();
- let mut adaptive_recover_until: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut floor_warn_next_allowed: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut drain_warn_next_allowed: HashMap = HashMap::new();
let mut degraded_interval = true;
@@ -109,8 +105,6 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c
&mut single_endpoint_outage,
&mut shadow_rotate_deadline,
&mut idle_refresh_next_attempt,
- &mut adaptive_idle_since,
- &mut adaptive_recover_until,
&mut floor_warn_next_allowed,
)
.await;
@@ -126,8 +120,6 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c
&mut single_endpoint_outage,
&mut shadow_rotate_deadline,
&mut idle_refresh_next_attempt,
- &mut adaptive_idle_since,
- &mut adaptive_recover_until,
&mut floor_warn_next_allowed,
)
.await;
@@ -360,8 +352,6 @@ async fn check_family(
single_endpoint_outage: &mut HashSet<(i32, IpFamily)>,
shadow_rotate_deadline: &mut HashMap<(i32, IpFamily), Instant>,
idle_refresh_next_attempt: &mut HashMap<(i32, IpFamily), Instant>,
- adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>,
- adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>,
floor_warn_next_allowed: &mut HashMap<(i32, IpFamily), Instant>,
) -> bool {
let enabled = match family {
@@ -393,10 +383,7 @@ async fn check_family(
let reconnect_budget = health_reconnect_budget(pool, dc_endpoints.len());
let reconnect_sem = Arc::new(Semaphore::new(reconnect_budget));
- if pool.floor_mode() == MeFloorMode::Static {
- adaptive_idle_since.clear();
- adaptive_recover_until.clear();
- }
+ if pool.floor_mode() == MeFloorMode::Static {}
let mut live_addr_counts = HashMap::<(i32, SocketAddr), usize>::new();
let mut live_writer_ids_by_addr = HashMap::<(i32, SocketAddr), Vec>::new();
@@ -435,8 +422,6 @@ async fn check_family(
&live_addr_counts,
&live_writer_ids_by_addr,
&bound_clients_by_writer,
- adaptive_idle_since,
- adaptive_recover_until,
)
.await;
pool.set_adaptive_floor_runtime_caps(
@@ -503,8 +488,6 @@ async fn check_family(
outage_next_attempt.remove(&key);
shadow_rotate_deadline.remove(&key);
idle_refresh_next_attempt.remove(&key);
- adaptive_idle_since.remove(&key);
- adaptive_recover_until.remove(&key);
info!(
dc = %dc,
?family,
@@ -632,22 +615,28 @@ async fn check_family(
restored += 1;
continue;
}
- pool_for_reconnect
- .stats
- .increment_me_floor_cap_block_total();
- pool_for_reconnect
- .stats
- .increment_me_floor_swap_idle_failed_total();
- debug!(
- dc = %dc,
- ?family,
- alive,
- required,
- active_cap_effective_total,
- "Adaptive floor cap reached, reconnect attempt blocked"
- );
- break;
+
+ let base_req = pool_for_reconnect
+ .required_writers_for_dc_with_floor_mode(endpoints_for_dc.len(), false);
+ if alive + restored >= base_req {
+ pool_for_reconnect
+ .stats
+ .increment_me_floor_cap_block_total();
+ pool_for_reconnect
+ .stats
+ .increment_me_floor_swap_idle_failed_total();
+ debug!(
+ dc = %dc,
+ ?family,
+ alive,
+ required,
+ active_cap_effective_total,
+ "Adaptive floor cap reached, reconnect attempt blocked"
+ );
+ break;
+ }
}
+ pool_for_reconnect.stats.increment_me_reconnect_attempt();
let res = tokio::time::timeout(
pool_for_reconnect.reconnect_runtime.me_one_timeout,
pool_for_reconnect.connect_endpoints_round_robin(
@@ -663,11 +652,9 @@ async fn check_family(
pool_for_reconnect.stats.increment_me_reconnect_success();
}
Ok(false) => {
- pool_for_reconnect.stats.increment_me_reconnect_attempt();
debug!(dc = %dc, ?family, "ME round-robin reconnect failed")
}
Err(_) => {
- pool_for_reconnect.stats.increment_me_reconnect_attempt();
debug!(dc = %dc, ?family, "ME reconnect timed out");
}
}
@@ -678,10 +665,8 @@ async fn check_family(
key,
dc,
family,
- alive,
required,
endpoint_count: endpoints_for_dc.len(),
- restored,
}
});
}
@@ -695,7 +680,7 @@ async fn check_family(
}
};
let now = Instant::now();
- let now_alive = outcome.alive + outcome.restored;
+ let now_alive = live_active_writers_for_dc_family(pool, outcome.dc, outcome.family).await;
if now_alive >= outcome.required {
info!(
dc = %outcome.dc,
@@ -851,6 +836,33 @@ fn should_emit_rate_limited_warn(
false
}
+async fn live_active_writers_for_dc_family(pool: &Arc, dc: i32, family: IpFamily) -> usize {
+ let writers = pool.writers.read().await;
+ writers
+ .iter()
+ .filter(|writer| {
+ if writer.draining.load(std::sync::atomic::Ordering::Relaxed) {
+ return false;
+ }
+ if writer.writer_dc != dc {
+ return false;
+ }
+ if !matches!(
+ super::pool::WriterContour::from_u8(
+ writer.contour.load(std::sync::atomic::Ordering::Relaxed),
+ ),
+ super::pool::WriterContour::Active
+ ) {
+ return false;
+ }
+ match family {
+ IpFamily::V4 => writer.addr.is_ipv4(),
+ IpFamily::V6 => writer.addr.is_ipv6(),
+ }
+ })
+ .count()
+}
+
fn adaptive_floor_class_min(
pool: &Arc,
endpoint_count: usize,
@@ -904,8 +916,6 @@ async fn build_family_floor_plan(
live_addr_counts: &HashMap<(i32, SocketAddr), usize>,
live_writer_ids_by_addr: &HashMap<(i32, SocketAddr), Vec>,
bound_clients_by_writer: &HashMap,
- adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>,
- adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>,
) -> FamilyFloorPlan {
let mut entries = Vec::::new();
let mut by_dc = HashMap::::new();
@@ -921,18 +931,7 @@ async fn build_family_floor_plan(
if endpoints.is_empty() {
continue;
}
- let key = (*dc, family);
- let reduce_for_idle = should_reduce_floor_for_idle(
- pool,
- key,
- *dc,
- endpoints,
- live_writer_ids_by_addr,
- bound_clients_by_writer,
- adaptive_idle_since,
- adaptive_recover_until,
- )
- .await;
+ let _key = (*dc, family);
let base_required = pool.required_writers_for_dc(endpoints.len()).max(1);
let min_required = if is_adaptive {
adaptive_floor_class_min(pool, endpoints.len(), base_required)
@@ -947,11 +946,11 @@ async fn build_family_floor_plan(
if max_required < min_required {
max_required = min_required;
}
- let desired_raw = if is_adaptive && reduce_for_idle {
- min_required
- } else {
- base_required
- };
+ // We initialize target_required at base_required to prevent 0-writer blackouts
+ // caused by proactively dropping an idle DC to a single fragile connection.
+ // The Adaptive Floor constraint loop below will gracefully compress idle DCs
+ // (prioritized via has_bound_clients = false) to min_required only when global capacity is reached.
+ let desired_raw = base_required;
let target_required = desired_raw.clamp(min_required, max_required);
let alive = endpoints
.iter()
@@ -1278,43 +1277,6 @@ async fn maybe_refresh_idle_writer_for_dc(
);
}
-async fn should_reduce_floor_for_idle(
- pool: &Arc,
- key: (i32, IpFamily),
- dc: i32,
- endpoints: &[SocketAddr],
- live_writer_ids_by_addr: &HashMap<(i32, SocketAddr), Vec>,
- bound_clients_by_writer: &HashMap,
- adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>,
- adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>,
-) -> bool {
- if pool.floor_mode() != MeFloorMode::Adaptive {
- adaptive_idle_since.remove(&key);
- adaptive_recover_until.remove(&key);
- return false;
- }
-
- let now = Instant::now();
- let writer_ids = list_writer_ids_for_endpoints(dc, endpoints, live_writer_ids_by_addr);
- let has_bound_clients = has_bound_clients_on_endpoint(&writer_ids, bound_clients_by_writer);
- if has_bound_clients {
- adaptive_idle_since.remove(&key);
- adaptive_recover_until.insert(key, now + pool.adaptive_floor_recover_grace_duration());
- return false;
- }
-
- if let Some(recover_until) = adaptive_recover_until.get(&key)
- && now < *recover_until
- {
- adaptive_idle_since.remove(&key);
- return false;
- }
- adaptive_recover_until.remove(&key);
-
- let idle_since = adaptive_idle_since.entry(key).or_insert(now);
- now.saturating_duration_since(*idle_since) >= pool.adaptive_floor_idle_duration()
-}
-
fn has_bound_clients_on_endpoint(
writer_ids: &[u64],
bound_clients_by_writer: &HashMap,
@@ -1364,6 +1326,7 @@ async fn recover_single_endpoint_outage(
);
return;
};
+ pool.stats.increment_me_reconnect_attempt();
pool.stats
.increment_me_single_endpoint_outage_reconnect_attempt_total();
@@ -1439,7 +1402,6 @@ async fn recover_single_endpoint_outage(
return;
}
- pool.stats.increment_me_reconnect_attempt();
let current_ms = *outage_backoff.get(&key).unwrap_or(&min_backoff_ms);
let next_ms = current_ms.saturating_mul(2).min(max_backoff_ms);
outage_backoff.insert(key, next_ms);
diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs
index 249d387..b89a844 100644
--- a/src/transport/middle_proxy/pool.rs
+++ b/src/transport/middle_proxy/pool.rs
@@ -1422,22 +1422,6 @@ impl MePool {
MeFloorMode::from_u8(self.floor_runtime.me_floor_mode.load(Ordering::Relaxed))
}
- pub(super) fn adaptive_floor_idle_duration(&self) -> Duration {
- Duration::from_secs(
- self.floor_runtime
- .me_adaptive_floor_idle_secs
- .load(Ordering::Relaxed),
- )
- }
-
- pub(super) fn adaptive_floor_recover_grace_duration(&self) -> Duration {
- Duration::from_secs(
- self.floor_runtime
- .me_adaptive_floor_recover_grace_secs
- .load(Ordering::Relaxed),
- )
- }
-
pub(super) fn adaptive_floor_min_writers_multi_endpoint(&self) -> usize {
(self
.floor_runtime
@@ -1659,6 +1643,7 @@ impl MePool {
&self,
contour: WriterContour,
allow_coverage_override: bool,
+ writer_dc: i32,
) -> bool {
let (active_writers, warm_writers, _) = self.non_draining_writer_counts_by_contour().await;
match contour {
@@ -1670,6 +1655,43 @@ impl MePool {
if !allow_coverage_override {
return false;
}
+
+ let mut endpoints_len = 0;
+ let now_epoch = Self::now_epoch_secs();
+ if self.family_enabled_for_drain_coverage(IpFamily::V4, now_epoch) {
+ if let Some(addrs) = self.proxy_map_v4.read().await.get(&writer_dc) {
+ endpoints_len += addrs.len();
+ }
+ }
+ if self.family_enabled_for_drain_coverage(IpFamily::V6, now_epoch) {
+ if let Some(addrs) = self.proxy_map_v6.read().await.get(&writer_dc) {
+ endpoints_len += addrs.len();
+ }
+ }
+
+ if endpoints_len > 0 {
+ let base_req =
+ self.required_writers_for_dc_with_floor_mode(endpoints_len, false);
+ let active_for_dc = {
+ let ws = self.writers.read().await;
+ ws.iter()
+ .filter(|w| {
+ !w.draining.load(std::sync::atomic::Ordering::Relaxed)
+ && w.writer_dc == writer_dc
+ && matches!(
+ WriterContour::from_u8(
+ w.contour.load(std::sync::atomic::Ordering::Relaxed),
+ ),
+ WriterContour::Active
+ )
+ })
+ .count()
+ };
+ if active_for_dc < base_req {
+ return true;
+ }
+ }
+
let coverage_required = self.active_coverage_required_total().await;
active_writers < coverage_required
}
diff --git a/src/transport/middle_proxy/pool_refill.rs b/src/transport/middle_proxy/pool_refill.rs
index 69d8aa0..f43ec3e 100644
--- a/src/transport/middle_proxy/pool_refill.rs
+++ b/src/transport/middle_proxy/pool_refill.rs
@@ -236,8 +236,18 @@ impl MePool {
let fast_retries = self.reconnect_runtime.me_reconnect_fast_retry_count.max(1);
let mut total_attempts = 0u32;
let same_endpoint_quarantined = self.is_endpoint_quarantined(addr).await;
+ let dc_endpoints = self.endpoints_for_dc(writer_dc).await;
+ let single_endpoint_dc = dc_endpoints.len() == 1 && dc_endpoints[0] == addr;
+ let bypass_quarantine_for_single_endpoint =
+ single_endpoint_dc && self.single_endpoint_outage_disable_quarantine();
- if !same_endpoint_quarantined {
+ if !same_endpoint_quarantined || bypass_quarantine_for_single_endpoint {
+ if same_endpoint_quarantined && bypass_quarantine_for_single_endpoint {
+ debug!(
+ %addr,
+ "Bypassing quarantine for immediate reconnect on single-endpoint DC"
+ );
+ }
for attempt in 0..fast_retries {
if total_attempts >= ME_REFILL_TOTAL_ATTEMPT_CAP {
break;
@@ -276,7 +286,6 @@ impl MePool {
);
}
- let dc_endpoints = self.endpoints_for_dc(writer_dc).await;
if dc_endpoints.is_empty() {
self.stats.increment_me_refill_failed_total();
return false;
diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs
index fae68b9..52c8fae 100644
--- a/src/transport/middle_proxy/pool_writer.rs
+++ b/src/transport/middle_proxy/pool_writer.rs
@@ -342,7 +342,7 @@ impl MePool {
allow_coverage_override: bool,
) -> Result<()> {
if !self
- .can_open_writer_for_contour(contour, allow_coverage_override)
+ .can_open_writer_for_contour(contour, allow_coverage_override, writer_dc)
.await
{
return Err(ProxyError::Proxy(format!(