Hotpath tunings

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey 2026-02-23 00:50:10 +03:00
parent 197f9867e0
commit d050c4794a
No known key found for this signature in database
1 changed files with 48 additions and 15 deletions

View File

@ -2,7 +2,7 @@ use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::sync::oneshot; use tokio::sync::{mpsc, oneshot};
use tracing::{debug, info, trace, warn}; use tracing::{debug, info, trace, warn};
use crate::config::ProxyConfig; use crate::config::ProxyConfig;
@ -14,6 +14,11 @@ use crate::stats::Stats;
use crate::stream::{BufferPool, CryptoReader, CryptoWriter}; use crate::stream::{BufferPool, CryptoReader, CryptoWriter};
use crate::transport::middle_proxy::{MePool, MeResponse, proto_flags_for_tag}; use crate::transport::middle_proxy::{MePool, MeResponse, proto_flags_for_tag};
enum C2MeCommand {
Data { payload: Vec<u8>, flags: u32 },
Close,
}
pub(crate) async fn handle_via_middle_proxy<R, W>( pub(crate) async fn handle_via_middle_proxy<R, W>(
mut crypto_reader: CryptoReader<R>, mut crypto_reader: CryptoReader<R>,
crypto_writer: CryptoWriter<W>, crypto_writer: CryptoWriter<W>,
@ -59,6 +64,30 @@ where
let frame_limit = config.general.max_client_frame; let frame_limit = config.general.max_client_frame;
let (c2me_tx, mut c2me_rx) = mpsc::channel::<C2MeCommand>(1024);
let me_pool_c2me = me_pool.clone();
let c2me_sender = tokio::spawn(async move {
while let Some(cmd) = c2me_rx.recv().await {
match cmd {
C2MeCommand::Data { payload, flags } => {
me_pool_c2me.send_proxy_req(
conn_id,
success.dc_idx,
peer,
translated_local_addr,
&payload,
flags,
).await?;
}
C2MeCommand::Close => {
let _ = me_pool_c2me.send_close(conn_id).await;
return Ok(());
}
}
}
Ok(())
});
let (stop_tx, mut stop_rx) = oneshot::channel::<()>(); let (stop_tx, mut stop_rx) = oneshot::channel::<()>();
let mut me_rx_task = me_rx; let mut me_rx_task = me_rx;
let stats_clone = stats.clone(); let stats_clone = stats.clone();
@ -147,22 +176,20 @@ where
if payload.len() >= 8 && payload[..8].iter().all(|b| *b == 0) { if payload.len() >= 8 && payload[..8].iter().all(|b| *b == 0) {
flags |= RPC_FLAG_NOT_ENCRYPTED; flags |= RPC_FLAG_NOT_ENCRYPTED;
} }
if let Err(e) = me_pool.send_proxy_req( // Keep client read loop lightweight: route heavy ME send path via a dedicated task.
conn_id, if c2me_tx
success.dc_idx, .send(C2MeCommand::Data { payload, flags })
peer, .await
translated_local_addr, .is_err()
&payload, {
flags, main_result = Err(ProxyError::Proxy("ME sender channel closed".into()));
).await {
main_result = Err(e);
break; break;
} }
} }
Ok(None) => { Ok(None) => {
debug!(conn_id, "Client EOF"); debug!(conn_id, "Client EOF");
client_closed = true; client_closed = true;
let _ = me_pool.send_close(conn_id).await; let _ = c2me_tx.send(C2MeCommand::Close).await;
break; break;
} }
Err(e) => { Err(e) => {
@ -172,6 +199,11 @@ where
} }
} }
drop(c2me_tx);
let c2me_result = c2me_sender
.await
.unwrap_or_else(|e| Err(ProxyError::Proxy(format!("ME sender join error: {e}"))));
let _ = stop_tx.send(()); let _ = stop_tx.send(());
let mut writer_result = me_writer let mut writer_result = me_writer
.await .await
@ -187,10 +219,11 @@ where
} }
} }
let result = match (main_result, writer_result) { let result = match (main_result, c2me_result, writer_result) {
(Ok(()), Ok(())) => Ok(()), (Ok(()), Ok(()), Ok(())) => Ok(()),
(Err(e), _) => Err(e), (Err(e), _, _) => Err(e),
(_, Err(e)) => Err(e), (_, Err(e), _) => Err(e),
(_, _, Err(e)) => Err(e),
}; };
debug!(user = %user, conn_id, "ME relay cleanup"); debug!(user = %user, conn_id, "ME relay cleanup");