DC to Client fine tuning

This commit is contained in:
Alexey
2026-03-08 04:51:46 +03:00
parent b41257f54e
commit 633af93b19
12 changed files with 477 additions and 72 deletions

View File

@@ -64,6 +64,8 @@ where
client_writer,
tg_reader,
tg_writer,
config.general.direct_relay_copy_buf_c2s_bytes,
config.general.direct_relay_copy_buf_s2c_bytes,
user,
Arc::clone(&stats),
buffer_pool,

View File

@@ -30,6 +30,8 @@ const DESYNC_ERROR_CLASS: &str = "frame_too_large_crypto_desync";
const C2ME_CHANNEL_CAPACITY_FALLBACK: usize = 128;
const C2ME_SOFT_PRESSURE_MIN_FREE_SLOTS: usize = 64;
const C2ME_SENDER_FAIRNESS_BUDGET: usize = 32;
const ME_D2C_FLUSH_BATCH_MAX_FRAMES_MIN: usize = 1;
const ME_D2C_FLUSH_BATCH_MAX_BYTES_MIN: usize = 4096;
static DESYNC_DEDUP: OnceLock<Mutex<HashMap<u64, Instant>>> = OnceLock::new();
struct RelayForensicsState {
@@ -44,6 +46,31 @@ struct RelayForensicsState {
desync_all_full: bool,
}
#[derive(Clone, Copy)]
struct MeD2cFlushPolicy {
max_frames: usize,
max_bytes: usize,
max_delay: Duration,
ack_flush_immediate: bool,
}
impl MeD2cFlushPolicy {
fn from_config(config: &ProxyConfig) -> Self {
Self {
max_frames: config
.general
.me_d2c_flush_batch_max_frames
.max(ME_D2C_FLUSH_BATCH_MAX_FRAMES_MIN),
max_bytes: config
.general
.me_d2c_flush_batch_max_bytes
.max(ME_D2C_FLUSH_BATCH_MAX_BYTES_MIN),
max_delay: Duration::from_micros(config.general.me_d2c_flush_batch_max_delay_us),
ack_flush_immediate: config.general.me_d2c_ack_flush_immediate,
}
}
}
fn hash_value<T: Hash>(value: &T) -> u64 {
let mut hasher = DefaultHasher::new();
value.hash(&mut hasher);
@@ -313,71 +340,152 @@ where
let rng_clone = rng.clone();
let user_clone = user.clone();
let bytes_me2c_clone = bytes_me2c.clone();
let d2c_flush_policy = MeD2cFlushPolicy::from_config(&config);
let me_writer = tokio::spawn(async move {
let mut writer = crypto_writer;
let mut frame_buf = Vec::with_capacity(16 * 1024);
loop {
tokio::select! {
msg = me_rx_task.recv() => {
match msg {
Some(MeResponse::Data { flags, data }) => {
trace!(conn_id, bytes = data.len(), flags, "ME->C data");
bytes_me2c_clone.fetch_add(data.len() as u64, Ordering::Relaxed);
stats_clone.add_user_octets_to(&user_clone, data.len() as u64);
write_client_payload(
&mut writer,
proto_tag,
flags,
&data,
rng_clone.as_ref(),
&mut frame_buf,
)
.await?;
let Some(first) = msg else {
debug!(conn_id, "ME channel closed");
return Err(ProxyError::Proxy("ME connection lost".into()));
};
// Drain all immediately queued ME responses and flush once.
while let Ok(next) = me_rx_task.try_recv() {
match next {
MeResponse::Data { flags, data } => {
trace!(conn_id, bytes = data.len(), flags, "ME->C data (batched)");
bytes_me2c_clone.fetch_add(data.len() as u64, Ordering::Relaxed);
stats_clone.add_user_octets_to(&user_clone, data.len() as u64);
write_client_payload(
&mut writer,
proto_tag,
flags,
&data,
rng_clone.as_ref(),
&mut frame_buf,
).await?;
let mut batch_frames = 0usize;
let mut batch_bytes = 0usize;
let mut flush_immediately = false;
match process_me_writer_response(
first,
&mut writer,
proto_tag,
rng_clone.as_ref(),
&mut frame_buf,
stats_clone.as_ref(),
&user_clone,
bytes_me2c_clone.as_ref(),
conn_id,
d2c_flush_policy.ack_flush_immediate,
false,
).await? {
MeWriterResponseOutcome::Continue { frames, bytes, flush_immediately: immediate } => {
batch_frames = batch_frames.saturating_add(frames);
batch_bytes = batch_bytes.saturating_add(bytes);
flush_immediately = immediate;
}
MeWriterResponseOutcome::Close => {
let _ = writer.flush().await;
return Ok(());
}
}
while !flush_immediately
&& batch_frames < d2c_flush_policy.max_frames
&& batch_bytes < d2c_flush_policy.max_bytes
{
let Ok(next) = me_rx_task.try_recv() else {
break;
};
match process_me_writer_response(
next,
&mut writer,
proto_tag,
rng_clone.as_ref(),
&mut frame_buf,
stats_clone.as_ref(),
&user_clone,
bytes_me2c_clone.as_ref(),
conn_id,
d2c_flush_policy.ack_flush_immediate,
true,
).await? {
MeWriterResponseOutcome::Continue { frames, bytes, flush_immediately: immediate } => {
batch_frames = batch_frames.saturating_add(frames);
batch_bytes = batch_bytes.saturating_add(bytes);
flush_immediately |= immediate;
}
MeWriterResponseOutcome::Close => {
let _ = writer.flush().await;
return Ok(());
}
}
}
if !flush_immediately
&& !d2c_flush_policy.max_delay.is_zero()
&& batch_frames < d2c_flush_policy.max_frames
&& batch_bytes < d2c_flush_policy.max_bytes
{
match tokio::time::timeout(d2c_flush_policy.max_delay, me_rx_task.recv()).await {
Ok(Some(next)) => {
match process_me_writer_response(
next,
&mut writer,
proto_tag,
rng_clone.as_ref(),
&mut frame_buf,
stats_clone.as_ref(),
&user_clone,
bytes_me2c_clone.as_ref(),
conn_id,
d2c_flush_policy.ack_flush_immediate,
true,
).await? {
MeWriterResponseOutcome::Continue { frames, bytes, flush_immediately: immediate } => {
batch_frames = batch_frames.saturating_add(frames);
batch_bytes = batch_bytes.saturating_add(bytes);
flush_immediately |= immediate;
}
MeResponse::Ack(confirm) => {
trace!(conn_id, confirm, "ME->C quickack (batched)");
write_client_ack(&mut writer, proto_tag, confirm).await?;
}
MeResponse::Close => {
debug!(conn_id, "ME sent close (batched)");
MeWriterResponseOutcome::Close => {
let _ = writer.flush().await;
return Ok(());
}
}
}
writer.flush().await.map_err(ProxyError::Io)?;
}
Some(MeResponse::Ack(confirm)) => {
trace!(conn_id, confirm, "ME->C quickack");
write_client_ack(&mut writer, proto_tag, confirm).await?;
}
Some(MeResponse::Close) => {
debug!(conn_id, "ME sent close");
let _ = writer.flush().await;
return Ok(());
}
None => {
debug!(conn_id, "ME channel closed");
return Err(ProxyError::Proxy("ME connection lost".into()));
while !flush_immediately
&& batch_frames < d2c_flush_policy.max_frames
&& batch_bytes < d2c_flush_policy.max_bytes
{
let Ok(extra) = me_rx_task.try_recv() else {
break;
};
match process_me_writer_response(
extra,
&mut writer,
proto_tag,
rng_clone.as_ref(),
&mut frame_buf,
stats_clone.as_ref(),
&user_clone,
bytes_me2c_clone.as_ref(),
conn_id,
d2c_flush_policy.ack_flush_immediate,
true,
).await? {
MeWriterResponseOutcome::Continue { frames, bytes, flush_immediately: immediate } => {
batch_frames = batch_frames.saturating_add(frames);
batch_bytes = batch_bytes.saturating_add(bytes);
flush_immediately |= immediate;
}
MeWriterResponseOutcome::Close => {
let _ = writer.flush().await;
return Ok(());
}
}
}
}
Ok(None) => {
debug!(conn_id, "ME channel closed");
return Err(ProxyError::Proxy("ME connection lost".into()));
}
Err(_) => {}
}
}
writer.flush().await.map_err(ProxyError::Io)?;
}
_ = &mut stop_rx => {
debug!(conn_id, "ME writer stop signal");
@@ -587,6 +695,81 @@ where
}
}
enum MeWriterResponseOutcome {
Continue {
frames: usize,
bytes: usize,
flush_immediately: bool,
},
Close,
}
async fn process_me_writer_response<W>(
response: MeResponse,
client_writer: &mut CryptoWriter<W>,
proto_tag: ProtoTag,
rng: &SecureRandom,
frame_buf: &mut Vec<u8>,
stats: &Stats,
user: &str,
bytes_me2c: &AtomicU64,
conn_id: u64,
ack_flush_immediate: bool,
batched: bool,
) -> Result<MeWriterResponseOutcome>
where
W: AsyncWrite + Unpin + Send + 'static,
{
match response {
MeResponse::Data { flags, data } => {
if batched {
trace!(conn_id, bytes = data.len(), flags, "ME->C data (batched)");
} else {
trace!(conn_id, bytes = data.len(), flags, "ME->C data");
}
bytes_me2c.fetch_add(data.len() as u64, Ordering::Relaxed);
stats.add_user_octets_to(user, data.len() as u64);
write_client_payload(
client_writer,
proto_tag,
flags,
&data,
rng,
frame_buf,
)
.await?;
Ok(MeWriterResponseOutcome::Continue {
frames: 1,
bytes: data.len(),
flush_immediately: false,
})
}
MeResponse::Ack(confirm) => {
if batched {
trace!(conn_id, confirm, "ME->C quickack (batched)");
} else {
trace!(conn_id, confirm, "ME->C quickack");
}
write_client_ack(client_writer, proto_tag, confirm).await?;
Ok(MeWriterResponseOutcome::Continue {
frames: 1,
bytes: 4,
flush_immediately: ack_flush_immediate,
})
}
MeResponse::Close => {
if batched {
debug!(conn_id, "ME sent close (batched)");
} else {
debug!(conn_id, "ME sent close");
}
Ok(MeWriterResponseOutcome::Close)
}
}
}
async fn write_client_payload<W>(
client_writer: &mut CryptoWriter<W>,
proto_tag: ProtoTag,
@@ -696,9 +879,7 @@ where
client_writer
.write_all(&bytes)
.await
.map_err(ProxyError::Io)?;
// ACK should remain low-latency.
client_writer.flush().await.map_err(ProxyError::Io)
.map_err(ProxyError::Io)
}
#[cfg(test)]

View File

@@ -57,7 +57,9 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf, copy_bidirectional};
use tokio::io::{
AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf, copy_bidirectional_with_sizes,
};
use tokio::time::Instant;
use tracing::{debug, trace, warn};
use crate::error::Result;
@@ -296,9 +298,8 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
///
/// ## API compatibility
///
/// Signature is identical to the previous implementation. The `_buffer_pool`
/// parameter is retained for call-site compatibility — `copy_bidirectional`
/// manages its own internal buffers (8 KB per direction).
/// The `_buffer_pool` parameter is retained for call-site compatibility.
/// Effective relay copy buffers are configured by `c2s_buf_size` / `s2c_buf_size`.
///
/// ## Guarantees preserved
///
@@ -312,6 +313,8 @@ pub async fn relay_bidirectional<CR, CW, SR, SW>(
client_writer: CW,
server_reader: SR,
server_writer: SW,
c2s_buf_size: usize,
s2c_buf_size: usize,
user: &str,
stats: Arc<Stats>,
_buffer_pool: Arc<BufferPool>,
@@ -402,7 +405,12 @@ where
// When the watchdog fires, select! drops the copy future,
// releasing the &mut borrows on client and server.
let copy_result = tokio::select! {
result = copy_bidirectional(&mut client, &mut server) => Some(result),
result = copy_bidirectional_with_sizes(
&mut client,
&mut server,
c2s_buf_size.max(1),
s2c_buf_size.max(1),
) => Some(result),
_ = watchdog => None, // Activity timeout — cancel relay
};
@@ -463,4 +471,4 @@ where
Ok(())
}
}
}
}