ME NoWait Routing + Upstream Connbudget + PROXY Header t/o + allocation cuts

This commit is contained in:
Alexey
2026-03-06 03:58:08 +03:00
parent 8f3bdaec2c
commit f32c34f126
15 changed files with 279 additions and 49 deletions

View File

@@ -119,6 +119,8 @@ pub struct MePool {
pub(super) ping_tracker: Arc<Mutex<HashMap<i64, (std::time::Instant, u64)>>>,
pub(super) rtt_stats: Arc<Mutex<HashMap<u64, (f64, f64)>>>,
pub(super) nat_reflection_cache: Arc<Mutex<NatReflectionCache>>,
pub(super) nat_reflection_singleflight_v4: Arc<Mutex<()>>,
pub(super) nat_reflection_singleflight_v6: Arc<Mutex<()>>,
pub(super) writer_available: Arc<Notify>,
pub(super) refill_inflight: Arc<Mutex<HashSet<SocketAddr>>>,
pub(super) refill_inflight_dc: Arc<Mutex<HashSet<RefillDcKey>>>,
@@ -323,6 +325,8 @@ impl MePool {
ping_tracker: Arc::new(Mutex::new(HashMap::new())),
rtt_stats: Arc::new(Mutex::new(HashMap::new())),
nat_reflection_cache: Arc::new(Mutex::new(NatReflectionCache::default())),
nat_reflection_singleflight_v4: Arc::new(Mutex::new(())),
nat_reflection_singleflight_v6: Arc::new(Mutex::new(())),
writer_available: Arc::new(Notify::new()),
refill_inflight: Arc::new(Mutex::new(HashSet::new())),
refill_inflight_dc: Arc::new(Mutex::new(HashSet::new())),

View File

@@ -248,6 +248,43 @@ impl MePool {
}
}
let _singleflight_guard = if use_shared_cache {
Some(match family {
IpFamily::V4 => self.nat_reflection_singleflight_v4.lock().await,
IpFamily::V6 => self.nat_reflection_singleflight_v6.lock().await,
})
} else {
None
};
if use_shared_cache
&& let Some(until) = *self.stun_backoff_until.read().await
&& Instant::now() < until
{
if let Ok(cache) = self.nat_reflection_cache.try_lock() {
let slot = match family {
IpFamily::V4 => cache.v4,
IpFamily::V6 => cache.v6,
};
return slot.map(|(_, addr)| addr);
}
return None;
}
if use_shared_cache
&& let Ok(mut cache) = self.nat_reflection_cache.try_lock()
{
let slot = match family {
IpFamily::V4 => &mut cache.v4,
IpFamily::V6 => &mut cache.v6,
};
if let Some((ts, addr)) = slot
&& ts.elapsed() < STUN_CACHE_TTL
{
return Some(*addr);
}
}
let attempt = if use_shared_cache {
self.nat_probe_attempts.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
} else {

View File

@@ -124,7 +124,7 @@ pub(crate) async fn reader_loop(
let data = Bytes::copy_from_slice(&body[12..]);
trace!(cid, flags, len = data.len(), "RPC_PROXY_ANS");
let routed = reg.route(cid, MeResponse::Data { flags, data }).await;
let routed = reg.route_nowait(cid, MeResponse::Data { flags, data }).await;
if !matches!(routed, RouteResult::Routed) {
match routed {
RouteResult::NoConn => stats.increment_me_route_drop_no_conn(),
@@ -147,7 +147,7 @@ pub(crate) async fn reader_loop(
let cfm = u32::from_le_bytes(body[8..12].try_into().unwrap());
trace!(cid, cfm, "RPC_SIMPLE_ACK");
let routed = reg.route(cid, MeResponse::Ack(cfm)).await;
let routed = reg.route_nowait(cid, MeResponse::Ack(cfm)).await;
if !matches!(routed, RouteResult::Routed) {
match routed {
RouteResult::NoConn => stats.increment_me_route_drop_no_conn(),

View File

@@ -208,6 +208,23 @@ impl ConnRegistry {
}
}
pub async fn route_nowait(&self, id: u64, resp: MeResponse) -> RouteResult {
let tx = {
let inner = self.inner.read().await;
inner.map.get(&id).cloned()
};
let Some(tx) = tx else {
return RouteResult::NoConn;
};
match tx.try_send(resp) {
Ok(()) => RouteResult::Routed,
Err(TrySendError::Closed(_)) => RouteResult::ChannelClosed,
Err(TrySendError::Full(_)) => RouteResult::QueueFullBase,
}
}
pub async fn bind_writer(
&self,
conn_id: u64,

View File

@@ -225,6 +225,7 @@ pub struct UpstreamManager {
upstreams: Arc<RwLock<Vec<UpstreamState>>>,
connect_retry_attempts: u32,
connect_retry_backoff: Duration,
connect_budget: Duration,
unhealthy_fail_threshold: u32,
connect_failfast_hard_errors: bool,
stats: Arc<Stats>,
@@ -235,6 +236,7 @@ impl UpstreamManager {
configs: Vec<UpstreamConfig>,
connect_retry_attempts: u32,
connect_retry_backoff_ms: u64,
connect_budget_ms: u64,
unhealthy_fail_threshold: u32,
connect_failfast_hard_errors: bool,
stats: Arc<Stats>,
@@ -248,6 +250,7 @@ impl UpstreamManager {
upstreams: Arc::new(RwLock::new(states)),
connect_retry_attempts: connect_retry_attempts.max(1),
connect_retry_backoff: Duration::from_millis(connect_retry_backoff_ms),
connect_budget: Duration::from_millis(connect_budget_ms.max(1)),
unhealthy_fail_threshold: unhealthy_fail_threshold.max(1),
connect_failfast_hard_errors,
stats,
@@ -593,11 +596,27 @@ impl UpstreamManager {
let mut last_error: Option<ProxyError> = None;
let mut attempts_used = 0u32;
for attempt in 1..=self.connect_retry_attempts {
let elapsed = connect_started_at.elapsed();
if elapsed >= self.connect_budget {
last_error = Some(ProxyError::ConnectionTimeout {
addr: target.to_string(),
});
break;
}
let remaining_budget = self.connect_budget.saturating_sub(elapsed);
let attempt_timeout = Duration::from_secs(DIRECT_CONNECT_TIMEOUT_SECS)
.min(remaining_budget);
if attempt_timeout.is_zero() {
last_error = Some(ProxyError::ConnectionTimeout {
addr: target.to_string(),
});
break;
}
attempts_used = attempt;
self.stats.increment_upstream_connect_attempt_total();
let start = Instant::now();
match self
.connect_via_upstream(&upstream, target, bind_rr.clone())
.connect_via_upstream(&upstream, target, bind_rr.clone(), attempt_timeout)
.await
{
Ok((stream, egress)) => {
@@ -707,6 +726,7 @@ impl UpstreamManager {
config: &UpstreamConfig,
target: SocketAddr,
bind_rr: Option<Arc<AtomicUsize>>,
connect_timeout: Duration,
) -> Result<(TcpStream, UpstreamEgressInfo)> {
match &config.upstream_type {
UpstreamType::Direct { interface, bind_addresses } => {
@@ -735,7 +755,6 @@ impl UpstreamManager {
let std_stream: std::net::TcpStream = socket.into();
let stream = TcpStream::from_std(std_stream)?;
let connect_timeout = Duration::from_secs(DIRECT_CONNECT_TIMEOUT_SECS);
match tokio::time::timeout(connect_timeout, stream.writable()).await {
Ok(Ok(())) => {}
Ok(Err(e)) => return Err(ProxyError::Io(e)),
@@ -762,7 +781,6 @@ impl UpstreamManager {
))
},
UpstreamType::Socks4 { address, interface, user_id } => {
let connect_timeout = Duration::from_secs(DIRECT_CONNECT_TIMEOUT_SECS);
// Try to parse as SocketAddr first (IP:port), otherwise treat as hostname:port
let mut stream = if let Ok(proxy_addr) = address.parse::<SocketAddr>() {
// IP:port format - use socket with optional interface binding
@@ -841,7 +859,6 @@ impl UpstreamManager {
))
},
UpstreamType::Socks5 { address, interface, username, password } => {
let connect_timeout = Duration::from_secs(DIRECT_CONNECT_TIMEOUT_SECS);
// Try to parse as SocketAddr first (IP:port), otherwise treat as hostname:port
let mut stream = if let Ok(proxy_addr) = address.parse::<SocketAddr>() {
// IP:port format - use socket with optional interface binding
@@ -1165,7 +1182,14 @@ impl UpstreamManager {
target: SocketAddr,
) -> Result<f64> {
let start = Instant::now();
let _ = self.connect_via_upstream(config, target, bind_rr).await?;
let _ = self
.connect_via_upstream(
config,
target,
bind_rr,
Duration::from_secs(DC_PING_TIMEOUT_SECS),
)
.await?;
Ok(start.elapsed().as_secs_f64() * 1000.0)
}
@@ -1337,7 +1361,12 @@ impl UpstreamManager {
let start = Instant::now();
let result = tokio::time::timeout(
Duration::from_secs(HEALTH_CHECK_CONNECT_TIMEOUT_SECS),
self.connect_via_upstream(&config, endpoint, Some(bind_rr.clone())),
self.connect_via_upstream(
&config,
endpoint,
Some(bind_rr.clone()),
Duration::from_secs(HEALTH_CHECK_CONNECT_TIMEOUT_SECS),
),
)
.await;