UpstreamManager: Backoff Retries

This commit is contained in:
Alexey
2026-02-28 14:21:09 +03:00
parent 6c12af2b94
commit 9afaa28add
6 changed files with 214 additions and 33 deletions

View File

@@ -172,10 +172,18 @@ pub struct UpstreamEgressInfo {
#[derive(Clone)]
pub struct UpstreamManager {
upstreams: Arc<RwLock<Vec<UpstreamState>>>,
connect_retry_attempts: u32,
connect_retry_backoff: Duration,
unhealthy_fail_threshold: u32,
}
impl UpstreamManager {
pub fn new(configs: Vec<UpstreamConfig>) -> Self {
pub fn new(
configs: Vec<UpstreamConfig>,
connect_retry_attempts: u32,
connect_retry_backoff_ms: u64,
unhealthy_fail_threshold: u32,
) -> Self {
let states = configs.into_iter()
.filter(|c| c.enabled)
.map(UpstreamState::new)
@@ -183,6 +191,9 @@ impl UpstreamManager {
Self {
upstreams: Arc::new(RwLock::new(states)),
connect_retry_attempts: connect_retry_attempts.max(1),
connect_retry_backoff: Duration::from_millis(connect_retry_backoff_ms),
unhealthy_fail_threshold: unhealthy_fail_threshold.max(1),
}
}
@@ -430,43 +441,83 @@ impl UpstreamManager {
upstream.selected_scope = s.to_string();
}
let start = Instant::now();
let bind_rr = {
let guard = self.upstreams.read().await;
guard.get(idx).map(|u| u.bind_rr.clone())
};
match self.connect_via_upstream(&upstream, target, bind_rr).await {
Ok((stream, egress)) => {
let rtt_ms = start.elapsed().as_secs_f64() * 1000.0;
let mut guard = self.upstreams.write().await;
if let Some(u) = guard.get_mut(idx) {
if !u.healthy {
debug!(rtt_ms = format!("{:.1}", rtt_ms), "Upstream recovered");
}
u.healthy = true;
u.fails = 0;
let mut last_error: Option<ProxyError> = None;
for attempt in 1..=self.connect_retry_attempts {
let start = Instant::now();
match self
.connect_via_upstream(&upstream, target, bind_rr.clone())
.await
{
Ok((stream, egress)) => {
let rtt_ms = start.elapsed().as_secs_f64() * 1000.0;
let mut guard = self.upstreams.write().await;
if let Some(u) = guard.get_mut(idx) {
if !u.healthy {
debug!(rtt_ms = format!("{:.1}", rtt_ms), "Upstream recovered");
}
if attempt > 1 {
debug!(
attempt,
attempts = self.connect_retry_attempts,
rtt_ms = format!("{:.1}", rtt_ms),
"Upstream connect recovered after retry"
);
}
u.healthy = true;
u.fails = 0;
if let Some(di) = dc_idx.and_then(UpstreamState::dc_array_idx) {
u.dc_latency[di].update(rtt_ms);
if let Some(di) = dc_idx.and_then(UpstreamState::dc_array_idx) {
u.dc_latency[di].update(rtt_ms);
}
}
return Ok((stream, egress));
}
Ok((stream, egress))
},
Err(e) => {
let mut guard = self.upstreams.write().await;
if let Some(u) = guard.get_mut(idx) {
u.fails += 1;
warn!(fails = u.fails, "Upstream failed: {}", e);
if u.fails > 3 {
u.healthy = false;
warn!("Upstream marked unhealthy");
Err(e) => {
if attempt < self.connect_retry_attempts {
debug!(
attempt,
attempts = self.connect_retry_attempts,
target = %target,
error = %e,
"Upstream connect attempt failed, retrying"
);
if !self.connect_retry_backoff.is_zero() {
tokio::time::sleep(self.connect_retry_backoff).await;
}
}
last_error = Some(e);
}
Err(e)
}
}
let error = last_error.unwrap_or_else(|| {
ProxyError::Config("Upstream connect attempts exhausted".to_string())
});
let mut guard = self.upstreams.write().await;
if let Some(u) = guard.get_mut(idx) {
u.fails += 1;
warn!(
fails = u.fails,
attempts = self.connect_retry_attempts,
"Upstream failed after retries: {}",
error
);
if u.fails >= self.unhealthy_fail_threshold {
u.healthy = false;
warn!(
fails = u.fails,
threshold = self.unhealthy_fail_threshold,
"Upstream marked unhealthy"
);
}
}
Err(error)
}
async fn connect_via_upstream(
@@ -1035,18 +1086,26 @@ impl UpstreamManager {
u.fails += 1;
debug!(dc = dc_zero_idx + 1, fails = u.fails,
"Health check failed (both): {}", e);
if u.fails > 3 {
if u.fails >= self.unhealthy_fail_threshold {
u.healthy = false;
warn!("Upstream unhealthy (fails)");
warn!(
fails = u.fails,
threshold = self.unhealthy_fail_threshold,
"Upstream unhealthy (fails)"
);
}
}
Err(_) => {
u.fails += 1;
debug!(dc = dc_zero_idx + 1, fails = u.fails,
"Health check timeout (both)");
if u.fails > 3 {
if u.fails >= self.unhealthy_fail_threshold {
u.healthy = false;
warn!("Upstream unhealthy (timeout)");
warn!(
fails = u.fails,
threshold = self.unhealthy_fail_threshold,
"Upstream unhealthy (timeout)"
);
}
}
}
@@ -1057,9 +1116,13 @@ impl UpstreamManager {
let mut guard = self.upstreams.write().await;
let u = &mut guard[i];
u.fails += 1;
if u.fails > 3 {
if u.fails >= self.unhealthy_fail_threshold {
u.healthy = false;
warn!("Upstream unhealthy (no fallback family)");
warn!(
fails = u.fails,
threshold = self.unhealthy_fail_threshold,
"Upstream unhealthy (no fallback family)"
);
}
u.last_check = std::time::Instant::now();
}