From d050c4794afac55997661a4995d87d3bda0e4b58 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Mon, 23 Feb 2026 00:50:10 +0300 Subject: [PATCH] Hotpath tunings Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/proxy/middle_relay.rs | 63 +++++++++++++++++++++++++++++---------- 1 file changed, 48 insertions(+), 15 deletions(-) diff --git a/src/proxy/middle_relay.rs b/src/proxy/middle_relay.rs index fe4e219..b7f22ae 100644 --- a/src/proxy/middle_relay.rs +++ b/src/proxy/middle_relay.rs @@ -2,7 +2,7 @@ use std::net::SocketAddr; use std::sync::Arc; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -use tokio::sync::oneshot; +use tokio::sync::{mpsc, oneshot}; use tracing::{debug, info, trace, warn}; use crate::config::ProxyConfig; @@ -14,6 +14,11 @@ use crate::stats::Stats; use crate::stream::{BufferPool, CryptoReader, CryptoWriter}; use crate::transport::middle_proxy::{MePool, MeResponse, proto_flags_for_tag}; +enum C2MeCommand { + Data { payload: Vec, flags: u32 }, + Close, +} + pub(crate) async fn handle_via_middle_proxy( mut crypto_reader: CryptoReader, crypto_writer: CryptoWriter, @@ -59,6 +64,30 @@ where let frame_limit = config.general.max_client_frame; + let (c2me_tx, mut c2me_rx) = mpsc::channel::(1024); + let me_pool_c2me = me_pool.clone(); + let c2me_sender = tokio::spawn(async move { + while let Some(cmd) = c2me_rx.recv().await { + match cmd { + C2MeCommand::Data { payload, flags } => { + me_pool_c2me.send_proxy_req( + conn_id, + success.dc_idx, + peer, + translated_local_addr, + &payload, + flags, + ).await?; + } + C2MeCommand::Close => { + let _ = me_pool_c2me.send_close(conn_id).await; + return Ok(()); + } + } + } + Ok(()) + }); + let (stop_tx, mut stop_rx) = oneshot::channel::<()>(); let mut me_rx_task = me_rx; let stats_clone = stats.clone(); @@ -147,22 +176,20 @@ where if payload.len() >= 8 && payload[..8].iter().all(|b| *b == 0) { flags |= RPC_FLAG_NOT_ENCRYPTED; } - if let Err(e) = me_pool.send_proxy_req( - conn_id, - success.dc_idx, - peer, - translated_local_addr, - &payload, - flags, - ).await { - main_result = Err(e); + // Keep client read loop lightweight: route heavy ME send path via a dedicated task. + if c2me_tx + .send(C2MeCommand::Data { payload, flags }) + .await + .is_err() + { + main_result = Err(ProxyError::Proxy("ME sender channel closed".into())); break; } } Ok(None) => { debug!(conn_id, "Client EOF"); client_closed = true; - let _ = me_pool.send_close(conn_id).await; + let _ = c2me_tx.send(C2MeCommand::Close).await; break; } Err(e) => { @@ -172,6 +199,11 @@ where } } + drop(c2me_tx); + let c2me_result = c2me_sender + .await + .unwrap_or_else(|e| Err(ProxyError::Proxy(format!("ME sender join error: {e}")))); + let _ = stop_tx.send(()); let mut writer_result = me_writer .await @@ -187,10 +219,11 @@ where } } - let result = match (main_result, writer_result) { - (Ok(()), Ok(())) => Ok(()), - (Err(e), _) => Err(e), - (_, Err(e)) => Err(e), + let result = match (main_result, c2me_result, writer_result) { + (Ok(()), Ok(()), Ok(())) => Ok(()), + (Err(e), _, _) => Err(e), + (_, Err(e), _) => Err(e), + (_, _, Err(e)) => Err(e), }; debug!(user = %user, conn_id, "ME relay cleanup");