Upstream Error classifier

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey 2026-03-03 20:49:09 +03:00
parent ec4e48808e
commit bd0dcfff15
No known key found for this signature in database
1 changed files with 93 additions and 3 deletions

View File

@ -19,6 +19,7 @@ use crate::config::{UpstreamConfig, UpstreamType};
use crate::error::{Result, ProxyError};
use crate::network::dns_overrides::{resolve_socket_addr, split_host_port};
use crate::protocol::constants::{TG_DATACENTERS_V4, TG_DATACENTERS_V6, TG_DATACENTER_PORT};
use crate::stats::Stats;
use crate::transport::socket::{create_outgoing_socket_bound, resolve_interface_ip};
use crate::transport::socks::{connect_socks4, connect_socks5};
@ -188,6 +189,8 @@ pub struct UpstreamManager {
connect_retry_attempts: u32,
connect_retry_backoff: Duration,
unhealthy_fail_threshold: u32,
connect_failfast_hard_errors: bool,
stats: Arc<Stats>,
}
impl UpstreamManager {
@ -196,6 +199,8 @@ impl UpstreamManager {
connect_retry_attempts: u32,
connect_retry_backoff_ms: u64,
unhealthy_fail_threshold: u32,
connect_failfast_hard_errors: bool,
stats: Arc<Stats>,
) -> Self {
let states = configs.into_iter()
.filter(|c| c.enabled)
@ -207,6 +212,8 @@ impl UpstreamManager {
connect_retry_attempts: connect_retry_attempts.max(1),
connect_retry_backoff: Duration::from_millis(connect_retry_backoff_ms),
unhealthy_fail_threshold: unhealthy_fail_threshold.max(1),
connect_failfast_hard_errors,
stats,
}
}
@ -349,6 +356,34 @@ impl UpstreamManager {
}
}
fn retry_backoff_with_jitter(&self) -> Duration {
if self.connect_retry_backoff.is_zero() {
return Duration::ZERO;
}
let base_ms = self.connect_retry_backoff.as_millis() as u64;
if base_ms == 0 {
return self.connect_retry_backoff;
}
let jitter_cap_ms = (base_ms / 2).max(1);
let jitter_ms = rand::rng().gen_range(0..=jitter_cap_ms);
Duration::from_millis(base_ms.saturating_add(jitter_ms))
}
fn is_hard_connect_error(error: &ProxyError) -> bool {
match error {
ProxyError::Config(_) | ProxyError::ConnectionRefused { .. } => true,
ProxyError::Io(ioe) => matches!(
ioe.kind(),
std::io::ErrorKind::ConnectionRefused
| std::io::ErrorKind::AddrInUse
| std::io::ErrorKind::AddrNotAvailable
| std::io::ErrorKind::InvalidInput
| std::io::ErrorKind::Unsupported
),
_ => false,
}
}
/// Select upstream using latency-weighted random selection.
async fn select_upstream(&self, dc_idx: Option<i16>, scope: Option<&str>) -> Option<usize> {
let upstreams = self.upstreams.read().await;
@ -459,8 +494,12 @@ impl UpstreamManager {
guard.get(idx).map(|u| u.bind_rr.clone())
};
let connect_started_at = Instant::now();
let mut last_error: Option<ProxyError> = None;
let mut attempts_used = 0u32;
for attempt in 1..=self.connect_retry_attempts {
attempts_used = attempt;
self.stats.increment_upstream_connect_attempt_total();
let start = Instant::now();
match self
.connect_via_upstream(&upstream, target, bind_rr.clone())
@ -468,6 +507,13 @@ impl UpstreamManager {
{
Ok((stream, egress)) => {
let rtt_ms = start.elapsed().as_secs_f64() * 1000.0;
self.stats.increment_upstream_connect_success_total();
self.stats
.observe_upstream_connect_attempts_per_request(attempts_used);
self.stats.observe_upstream_connect_duration_ms(
connect_started_at.elapsed().as_millis() as u64,
true,
);
let mut guard = self.upstreams.write().await;
if let Some(u) = guard.get_mut(idx) {
if !u.healthy {
@ -491,7 +537,13 @@ impl UpstreamManager {
return Ok((stream, egress));
}
Err(e) => {
if attempt < self.connect_retry_attempts {
let hard_error =
self.connect_failfast_hard_errors && Self::is_hard_connect_error(&e);
if hard_error {
self.stats
.increment_upstream_connect_failfast_hard_error_total();
}
if attempt < self.connect_retry_attempts && !hard_error {
debug!(
attempt,
attempts = self.connect_retry_attempts,
@ -499,14 +551,34 @@ impl UpstreamManager {
error = %e,
"Upstream connect attempt failed, retrying"
);
if !self.connect_retry_backoff.is_zero() {
tokio::time::sleep(self.connect_retry_backoff).await;
let backoff = self.retry_backoff_with_jitter();
if !backoff.is_zero() {
tokio::time::sleep(backoff).await;
}
} else if hard_error {
debug!(
attempt,
attempts = self.connect_retry_attempts,
target = %target,
error = %e,
"Upstream connect failed with hard error, failfast is active"
);
}
last_error = Some(e);
if hard_error {
break;
}
}
}
}
self.stats.increment_upstream_connect_fail_total();
self.stats
.observe_upstream_connect_attempts_per_request(attempts_used);
self.stats.observe_upstream_connect_duration_ms(
connect_started_at.elapsed().as_millis() as u64,
false,
);
let error = last_error.unwrap_or_else(|| {
ProxyError::Config("Upstream connect attempts exhausted".to_string())
@ -514,6 +586,8 @@ impl UpstreamManager {
let mut guard = self.upstreams.write().await;
if let Some(u) = guard.get_mut(idx) {
// Intermediate attempts are intentionally ignored here.
// Health state is degraded only when the entire connect cycle fails.
u.fails += 1;
warn!(
fails = u.fails,
@ -1364,4 +1438,20 @@ mod tests {
.contains(&"198.51.100.2:443".parse::<SocketAddr>().unwrap()));
assert!(dc9.fallback.is_empty());
}
#[test]
fn hard_connect_error_classification_detects_connection_refused() {
let error = ProxyError::ConnectionRefused {
addr: "127.0.0.1:443".to_string(),
};
assert!(UpstreamManager::is_hard_connect_error(&error));
}
#[test]
fn hard_connect_error_classification_skips_timeouts() {
let error = ProxyError::ConnectionTimeout {
addr: "127.0.0.1:443".to_string(),
};
assert!(!UpstreamManager::is_hard_connect_error(&error));
}
}