Merge pull request #373 from telemt/flow-d2c

DC to Client fine tuning
This commit is contained in:
Alexey 2026-03-08 04:53:16 +03:00 committed by GitHub
commit a616775f6d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 477 additions and 72 deletions

View File

@ -24,6 +24,13 @@ const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_WARM_WRITERS_GLOBAL: u32 = 256;
const DEFAULT_ME_WRITER_CMD_CHANNEL_CAPACITY: usize = 4096; const DEFAULT_ME_WRITER_CMD_CHANNEL_CAPACITY: usize = 4096;
const DEFAULT_ME_ROUTE_CHANNEL_CAPACITY: usize = 768; const DEFAULT_ME_ROUTE_CHANNEL_CAPACITY: usize = 768;
const DEFAULT_ME_C2ME_CHANNEL_CAPACITY: usize = 1024; const DEFAULT_ME_C2ME_CHANNEL_CAPACITY: usize = 1024;
const DEFAULT_ME_READER_ROUTE_DATA_WAIT_MS: u64 = 2;
const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_FRAMES: usize = 32;
const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_BYTES: usize = 128 * 1024;
const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_DELAY_US: u64 = 1500;
const DEFAULT_ME_D2C_ACK_FLUSH_IMMEDIATE: bool = false;
const DEFAULT_DIRECT_RELAY_COPY_BUF_C2S_BYTES: usize = 64 * 1024;
const DEFAULT_DIRECT_RELAY_COPY_BUF_S2C_BYTES: usize = 256 * 1024;
const DEFAULT_ME_WRITER_PICK_SAMPLE_SIZE: u8 = 3; const DEFAULT_ME_WRITER_PICK_SAMPLE_SIZE: u8 = 3;
const DEFAULT_ME_HEALTH_INTERVAL_MS_UNHEALTHY: u64 = 1000; const DEFAULT_ME_HEALTH_INTERVAL_MS_UNHEALTHY: u64 = 1000;
const DEFAULT_ME_HEALTH_INTERVAL_MS_HEALTHY: u64 = 3000; const DEFAULT_ME_HEALTH_INTERVAL_MS_HEALTHY: u64 = 3000;
@ -316,6 +323,34 @@ pub(crate) fn default_me_c2me_channel_capacity() -> usize {
DEFAULT_ME_C2ME_CHANNEL_CAPACITY DEFAULT_ME_C2ME_CHANNEL_CAPACITY
} }
pub(crate) fn default_me_reader_route_data_wait_ms() -> u64 {
DEFAULT_ME_READER_ROUTE_DATA_WAIT_MS
}
pub(crate) fn default_me_d2c_flush_batch_max_frames() -> usize {
DEFAULT_ME_D2C_FLUSH_BATCH_MAX_FRAMES
}
pub(crate) fn default_me_d2c_flush_batch_max_bytes() -> usize {
DEFAULT_ME_D2C_FLUSH_BATCH_MAX_BYTES
}
pub(crate) fn default_me_d2c_flush_batch_max_delay_us() -> u64 {
DEFAULT_ME_D2C_FLUSH_BATCH_MAX_DELAY_US
}
pub(crate) fn default_me_d2c_ack_flush_immediate() -> bool {
DEFAULT_ME_D2C_ACK_FLUSH_IMMEDIATE
}
pub(crate) fn default_direct_relay_copy_buf_c2s_bytes() -> usize {
DEFAULT_DIRECT_RELAY_COPY_BUF_C2S_BYTES
}
pub(crate) fn default_direct_relay_copy_buf_s2c_bytes() -> usize {
DEFAULT_DIRECT_RELAY_COPY_BUF_S2C_BYTES
}
pub(crate) fn default_me_writer_pick_sample_size() -> u8 { pub(crate) fn default_me_writer_pick_sample_size() -> u8 {
DEFAULT_ME_WRITER_PICK_SAMPLE_SIZE DEFAULT_ME_WRITER_PICK_SAMPLE_SIZE
} }

View File

@ -96,6 +96,13 @@ pub struct HotFields {
pub me_route_backpressure_base_timeout_ms: u64, pub me_route_backpressure_base_timeout_ms: u64,
pub me_route_backpressure_high_timeout_ms: u64, pub me_route_backpressure_high_timeout_ms: u64,
pub me_route_backpressure_high_watermark_pct: u8, pub me_route_backpressure_high_watermark_pct: u8,
pub me_reader_route_data_wait_ms: u64,
pub me_d2c_flush_batch_max_frames: usize,
pub me_d2c_flush_batch_max_bytes: usize,
pub me_d2c_flush_batch_max_delay_us: u64,
pub me_d2c_ack_flush_immediate: bool,
pub direct_relay_copy_buf_c2s_bytes: usize,
pub direct_relay_copy_buf_s2c_bytes: usize,
pub me_health_interval_ms_unhealthy: u64, pub me_health_interval_ms_unhealthy: u64,
pub me_health_interval_ms_healthy: u64, pub me_health_interval_ms_healthy: u64,
pub me_admission_poll_ms: u64, pub me_admission_poll_ms: u64,
@ -203,6 +210,13 @@ impl HotFields {
me_route_backpressure_base_timeout_ms: cfg.general.me_route_backpressure_base_timeout_ms, me_route_backpressure_base_timeout_ms: cfg.general.me_route_backpressure_base_timeout_ms,
me_route_backpressure_high_timeout_ms: cfg.general.me_route_backpressure_high_timeout_ms, me_route_backpressure_high_timeout_ms: cfg.general.me_route_backpressure_high_timeout_ms,
me_route_backpressure_high_watermark_pct: cfg.general.me_route_backpressure_high_watermark_pct, me_route_backpressure_high_watermark_pct: cfg.general.me_route_backpressure_high_watermark_pct,
me_reader_route_data_wait_ms: cfg.general.me_reader_route_data_wait_ms,
me_d2c_flush_batch_max_frames: cfg.general.me_d2c_flush_batch_max_frames,
me_d2c_flush_batch_max_bytes: cfg.general.me_d2c_flush_batch_max_bytes,
me_d2c_flush_batch_max_delay_us: cfg.general.me_d2c_flush_batch_max_delay_us,
me_d2c_ack_flush_immediate: cfg.general.me_d2c_ack_flush_immediate,
direct_relay_copy_buf_c2s_bytes: cfg.general.direct_relay_copy_buf_c2s_bytes,
direct_relay_copy_buf_s2c_bytes: cfg.general.direct_relay_copy_buf_s2c_bytes,
me_health_interval_ms_unhealthy: cfg.general.me_health_interval_ms_unhealthy, me_health_interval_ms_unhealthy: cfg.general.me_health_interval_ms_unhealthy,
me_health_interval_ms_healthy: cfg.general.me_health_interval_ms_healthy, me_health_interval_ms_healthy: cfg.general.me_health_interval_ms_healthy,
me_admission_poll_ms: cfg.general.me_admission_poll_ms, me_admission_poll_ms: cfg.general.me_admission_poll_ms,
@ -352,6 +366,13 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig {
new.general.me_route_backpressure_high_timeout_ms; new.general.me_route_backpressure_high_timeout_ms;
cfg.general.me_route_backpressure_high_watermark_pct = cfg.general.me_route_backpressure_high_watermark_pct =
new.general.me_route_backpressure_high_watermark_pct; new.general.me_route_backpressure_high_watermark_pct;
cfg.general.me_reader_route_data_wait_ms = new.general.me_reader_route_data_wait_ms;
cfg.general.me_d2c_flush_batch_max_frames = new.general.me_d2c_flush_batch_max_frames;
cfg.general.me_d2c_flush_batch_max_bytes = new.general.me_d2c_flush_batch_max_bytes;
cfg.general.me_d2c_flush_batch_max_delay_us = new.general.me_d2c_flush_batch_max_delay_us;
cfg.general.me_d2c_ack_flush_immediate = new.general.me_d2c_ack_flush_immediate;
cfg.general.direct_relay_copy_buf_c2s_bytes = new.general.direct_relay_copy_buf_c2s_bytes;
cfg.general.direct_relay_copy_buf_s2c_bytes = new.general.direct_relay_copy_buf_s2c_bytes;
cfg.general.me_health_interval_ms_unhealthy = new.general.me_health_interval_ms_unhealthy; cfg.general.me_health_interval_ms_unhealthy = new.general.me_health_interval_ms_unhealthy;
cfg.general.me_health_interval_ms_healthy = new.general.me_health_interval_ms_healthy; cfg.general.me_health_interval_ms_healthy = new.general.me_health_interval_ms_healthy;
cfg.general.me_admission_poll_ms = new.general.me_admission_poll_ms; cfg.general.me_admission_poll_ms = new.general.me_admission_poll_ms;
@ -821,6 +842,7 @@ fn log_changes(
!= new_hot.me_route_backpressure_high_timeout_ms != new_hot.me_route_backpressure_high_timeout_ms
|| old_hot.me_route_backpressure_high_watermark_pct || old_hot.me_route_backpressure_high_watermark_pct
!= new_hot.me_route_backpressure_high_watermark_pct != new_hot.me_route_backpressure_high_watermark_pct
|| old_hot.me_reader_route_data_wait_ms != new_hot.me_reader_route_data_wait_ms
|| old_hot.me_health_interval_ms_unhealthy || old_hot.me_health_interval_ms_unhealthy
!= new_hot.me_health_interval_ms_unhealthy != new_hot.me_health_interval_ms_unhealthy
|| old_hot.me_health_interval_ms_healthy != new_hot.me_health_interval_ms_healthy || old_hot.me_health_interval_ms_healthy != new_hot.me_health_interval_ms_healthy
@ -828,10 +850,11 @@ fn log_changes(
|| old_hot.me_warn_rate_limit_ms != new_hot.me_warn_rate_limit_ms || old_hot.me_warn_rate_limit_ms != new_hot.me_warn_rate_limit_ms
{ {
info!( info!(
"config reload: me_route_backpressure: base={}ms high={}ms watermark={}%; me_health_interval: unhealthy={}ms healthy={}ms; me_admission_poll={}ms; me_warn_rate_limit={}ms", "config reload: me_route_backpressure: base={}ms high={}ms watermark={}%; me_reader_route_data_wait_ms={}; me_health_interval: unhealthy={}ms healthy={}ms; me_admission_poll={}ms; me_warn_rate_limit={}ms",
new_hot.me_route_backpressure_base_timeout_ms, new_hot.me_route_backpressure_base_timeout_ms,
new_hot.me_route_backpressure_high_timeout_ms, new_hot.me_route_backpressure_high_timeout_ms,
new_hot.me_route_backpressure_high_watermark_pct, new_hot.me_route_backpressure_high_watermark_pct,
new_hot.me_reader_route_data_wait_ms,
new_hot.me_health_interval_ms_unhealthy, new_hot.me_health_interval_ms_unhealthy,
new_hot.me_health_interval_ms_healthy, new_hot.me_health_interval_ms_healthy,
new_hot.me_admission_poll_ms, new_hot.me_admission_poll_ms,
@ -839,6 +862,24 @@ fn log_changes(
); );
} }
if old_hot.me_d2c_flush_batch_max_frames != new_hot.me_d2c_flush_batch_max_frames
|| old_hot.me_d2c_flush_batch_max_bytes != new_hot.me_d2c_flush_batch_max_bytes
|| old_hot.me_d2c_flush_batch_max_delay_us != new_hot.me_d2c_flush_batch_max_delay_us
|| old_hot.me_d2c_ack_flush_immediate != new_hot.me_d2c_ack_flush_immediate
|| old_hot.direct_relay_copy_buf_c2s_bytes != new_hot.direct_relay_copy_buf_c2s_bytes
|| old_hot.direct_relay_copy_buf_s2c_bytes != new_hot.direct_relay_copy_buf_s2c_bytes
{
info!(
"config reload: relay_tuning: me_d2c_frames={} me_d2c_bytes={} me_d2c_delay_us={} me_ack_flush_immediate={} direct_buf_c2s={} direct_buf_s2c={}",
new_hot.me_d2c_flush_batch_max_frames,
new_hot.me_d2c_flush_batch_max_bytes,
new_hot.me_d2c_flush_batch_max_delay_us,
new_hot.me_d2c_ack_flush_immediate,
new_hot.direct_relay_copy_buf_c2s_bytes,
new_hot.direct_relay_copy_buf_s2c_bytes,
);
}
if old_hot.users != new_hot.users { if old_hot.users != new_hot.users {
let mut added: Vec<&String> = new_hot.users.keys() let mut added: Vec<&String> = new_hot.users.keys()
.filter(|u| !old_hot.users.contains_key(*u)) .filter(|u| !old_hot.users.contains_key(*u))

View File

@ -303,6 +303,42 @@ impl ProxyConfig {
)); ));
} }
if config.general.me_reader_route_data_wait_ms > 20 {
return Err(ProxyError::Config(
"general.me_reader_route_data_wait_ms must be within [0, 20]".to_string(),
));
}
if !(1..=512).contains(&config.general.me_d2c_flush_batch_max_frames) {
return Err(ProxyError::Config(
"general.me_d2c_flush_batch_max_frames must be within [1, 512]".to_string(),
));
}
if !(4096..=2 * 1024 * 1024).contains(&config.general.me_d2c_flush_batch_max_bytes) {
return Err(ProxyError::Config(
"general.me_d2c_flush_batch_max_bytes must be within [4096, 2097152]".to_string(),
));
}
if config.general.me_d2c_flush_batch_max_delay_us > 5000 {
return Err(ProxyError::Config(
"general.me_d2c_flush_batch_max_delay_us must be within [0, 5000]".to_string(),
));
}
if !(4096..=1024 * 1024).contains(&config.general.direct_relay_copy_buf_c2s_bytes) {
return Err(ProxyError::Config(
"general.direct_relay_copy_buf_c2s_bytes must be within [4096, 1048576]".to_string(),
));
}
if !(8192..=2 * 1024 * 1024).contains(&config.general.direct_relay_copy_buf_s2c_bytes) {
return Err(ProxyError::Config(
"general.direct_relay_copy_buf_s2c_bytes must be within [8192, 2097152]".to_string(),
));
}
if config.general.me_health_interval_ms_unhealthy == 0 { if config.general.me_health_interval_ms_unhealthy == 0 {
return Err(ProxyError::Config( return Err(ProxyError::Config(
"general.me_health_interval_ms_unhealthy must be > 0".to_string(), "general.me_health_interval_ms_unhealthy must be > 0".to_string(),

View File

@ -458,6 +458,36 @@ pub struct GeneralConfig {
#[serde(default = "default_me_c2me_channel_capacity")] #[serde(default = "default_me_c2me_channel_capacity")]
pub me_c2me_channel_capacity: usize, pub me_c2me_channel_capacity: usize,
/// Bounded wait in milliseconds for routing ME DATA to per-connection queue.
/// `0` keeps legacy no-wait behavior.
#[serde(default = "default_me_reader_route_data_wait_ms")]
pub me_reader_route_data_wait_ms: u64,
/// Maximum number of ME->Client responses coalesced before flush.
#[serde(default = "default_me_d2c_flush_batch_max_frames")]
pub me_d2c_flush_batch_max_frames: usize,
/// Maximum total payload bytes coalesced before flush.
#[serde(default = "default_me_d2c_flush_batch_max_bytes")]
pub me_d2c_flush_batch_max_bytes: usize,
/// Maximum wait in microseconds to coalesce additional ME->Client responses.
/// `0` disables timed coalescing.
#[serde(default = "default_me_d2c_flush_batch_max_delay_us")]
pub me_d2c_flush_batch_max_delay_us: u64,
/// Flush client writer immediately after quick-ack write.
#[serde(default = "default_me_d2c_ack_flush_immediate")]
pub me_d2c_ack_flush_immediate: bool,
/// Copy buffer size for client->DC direction in direct relay.
#[serde(default = "default_direct_relay_copy_buf_c2s_bytes")]
pub direct_relay_copy_buf_c2s_bytes: usize,
/// Copy buffer size for DC->client direction in direct relay.
#[serde(default = "default_direct_relay_copy_buf_s2c_bytes")]
pub direct_relay_copy_buf_s2c_bytes: usize,
/// Max pending ciphertext buffer per client writer (bytes). /// Max pending ciphertext buffer per client writer (bytes).
/// Controls FakeTLS backpressure vs throughput. /// Controls FakeTLS backpressure vs throughput.
#[serde(default = "default_crypto_pending_buffer")] #[serde(default = "default_crypto_pending_buffer")]
@ -861,6 +891,13 @@ impl Default for GeneralConfig {
me_writer_cmd_channel_capacity: default_me_writer_cmd_channel_capacity(), me_writer_cmd_channel_capacity: default_me_writer_cmd_channel_capacity(),
me_route_channel_capacity: default_me_route_channel_capacity(), me_route_channel_capacity: default_me_route_channel_capacity(),
me_c2me_channel_capacity: default_me_c2me_channel_capacity(), me_c2me_channel_capacity: default_me_c2me_channel_capacity(),
me_reader_route_data_wait_ms: default_me_reader_route_data_wait_ms(),
me_d2c_flush_batch_max_frames: default_me_d2c_flush_batch_max_frames(),
me_d2c_flush_batch_max_bytes: default_me_d2c_flush_batch_max_bytes(),
me_d2c_flush_batch_max_delay_us: default_me_d2c_flush_batch_max_delay_us(),
me_d2c_ack_flush_immediate: default_me_d2c_ack_flush_immediate(),
direct_relay_copy_buf_c2s_bytes: default_direct_relay_copy_buf_c2s_bytes(),
direct_relay_copy_buf_s2c_bytes: default_direct_relay_copy_buf_s2c_bytes(),
me_warmup_stagger_enabled: default_true(), me_warmup_stagger_enabled: default_true(),
me_warmup_step_delay_ms: default_warmup_step_delay_ms(), me_warmup_step_delay_ms: default_warmup_step_delay_ms(),
me_warmup_step_jitter_ms: default_warmup_step_jitter_ms(), me_warmup_step_jitter_ms: default_warmup_step_jitter_ms(),

View File

@ -1055,6 +1055,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
config.general.me_route_backpressure_base_timeout_ms, config.general.me_route_backpressure_base_timeout_ms,
config.general.me_route_backpressure_high_timeout_ms, config.general.me_route_backpressure_high_timeout_ms,
config.general.me_route_backpressure_high_watermark_pct, config.general.me_route_backpressure_high_watermark_pct,
config.general.me_reader_route_data_wait_ms,
config.general.me_health_interval_ms_unhealthy, config.general.me_health_interval_ms_unhealthy,
config.general.me_health_interval_ms_healthy, config.general.me_health_interval_ms_healthy,
config.general.me_warn_rate_limit_ms, config.general.me_warn_rate_limit_ms,
@ -1559,6 +1560,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
cfg.general.me_route_backpressure_base_timeout_ms, cfg.general.me_route_backpressure_base_timeout_ms,
cfg.general.me_route_backpressure_high_timeout_ms, cfg.general.me_route_backpressure_high_timeout_ms,
cfg.general.me_route_backpressure_high_watermark_pct, cfg.general.me_route_backpressure_high_watermark_pct,
cfg.general.me_reader_route_data_wait_ms,
); );
} }
} }

View File

@ -64,6 +64,8 @@ where
client_writer, client_writer,
tg_reader, tg_reader,
tg_writer, tg_writer,
config.general.direct_relay_copy_buf_c2s_bytes,
config.general.direct_relay_copy_buf_s2c_bytes,
user, user,
Arc::clone(&stats), Arc::clone(&stats),
buffer_pool, buffer_pool,

View File

@ -30,6 +30,8 @@ const DESYNC_ERROR_CLASS: &str = "frame_too_large_crypto_desync";
const C2ME_CHANNEL_CAPACITY_FALLBACK: usize = 128; const C2ME_CHANNEL_CAPACITY_FALLBACK: usize = 128;
const C2ME_SOFT_PRESSURE_MIN_FREE_SLOTS: usize = 64; const C2ME_SOFT_PRESSURE_MIN_FREE_SLOTS: usize = 64;
const C2ME_SENDER_FAIRNESS_BUDGET: usize = 32; const C2ME_SENDER_FAIRNESS_BUDGET: usize = 32;
const ME_D2C_FLUSH_BATCH_MAX_FRAMES_MIN: usize = 1;
const ME_D2C_FLUSH_BATCH_MAX_BYTES_MIN: usize = 4096;
static DESYNC_DEDUP: OnceLock<Mutex<HashMap<u64, Instant>>> = OnceLock::new(); static DESYNC_DEDUP: OnceLock<Mutex<HashMap<u64, Instant>>> = OnceLock::new();
struct RelayForensicsState { struct RelayForensicsState {
@ -44,6 +46,31 @@ struct RelayForensicsState {
desync_all_full: bool, desync_all_full: bool,
} }
#[derive(Clone, Copy)]
struct MeD2cFlushPolicy {
max_frames: usize,
max_bytes: usize,
max_delay: Duration,
ack_flush_immediate: bool,
}
impl MeD2cFlushPolicy {
fn from_config(config: &ProxyConfig) -> Self {
Self {
max_frames: config
.general
.me_d2c_flush_batch_max_frames
.max(ME_D2C_FLUSH_BATCH_MAX_FRAMES_MIN),
max_bytes: config
.general
.me_d2c_flush_batch_max_bytes
.max(ME_D2C_FLUSH_BATCH_MAX_BYTES_MIN),
max_delay: Duration::from_micros(config.general.me_d2c_flush_batch_max_delay_us),
ack_flush_immediate: config.general.me_d2c_ack_flush_immediate,
}
}
}
fn hash_value<T: Hash>(value: &T) -> u64 { fn hash_value<T: Hash>(value: &T) -> u64 {
let mut hasher = DefaultHasher::new(); let mut hasher = DefaultHasher::new();
value.hash(&mut hasher); value.hash(&mut hasher);
@ -313,71 +340,152 @@ where
let rng_clone = rng.clone(); let rng_clone = rng.clone();
let user_clone = user.clone(); let user_clone = user.clone();
let bytes_me2c_clone = bytes_me2c.clone(); let bytes_me2c_clone = bytes_me2c.clone();
let d2c_flush_policy = MeD2cFlushPolicy::from_config(&config);
let me_writer = tokio::spawn(async move { let me_writer = tokio::spawn(async move {
let mut writer = crypto_writer; let mut writer = crypto_writer;
let mut frame_buf = Vec::with_capacity(16 * 1024); let mut frame_buf = Vec::with_capacity(16 * 1024);
loop { loop {
tokio::select! { tokio::select! {
msg = me_rx_task.recv() => { msg = me_rx_task.recv() => {
match msg { let Some(first) = msg else {
Some(MeResponse::Data { flags, data }) => { debug!(conn_id, "ME channel closed");
trace!(conn_id, bytes = data.len(), flags, "ME->C data"); return Err(ProxyError::Proxy("ME connection lost".into()));
bytes_me2c_clone.fetch_add(data.len() as u64, Ordering::Relaxed); };
stats_clone.add_user_octets_to(&user_clone, data.len() as u64);
write_client_payload(
&mut writer,
proto_tag,
flags,
&data,
rng_clone.as_ref(),
&mut frame_buf,
)
.await?;
// Drain all immediately queued ME responses and flush once. let mut batch_frames = 0usize;
while let Ok(next) = me_rx_task.try_recv() { let mut batch_bytes = 0usize;
match next { let mut flush_immediately = false;
MeResponse::Data { flags, data } => {
trace!(conn_id, bytes = data.len(), flags, "ME->C data (batched)"); match process_me_writer_response(
bytes_me2c_clone.fetch_add(data.len() as u64, Ordering::Relaxed); first,
stats_clone.add_user_octets_to(&user_clone, data.len() as u64); &mut writer,
write_client_payload( proto_tag,
&mut writer, rng_clone.as_ref(),
proto_tag, &mut frame_buf,
flags, stats_clone.as_ref(),
&data, &user_clone,
rng_clone.as_ref(), bytes_me2c_clone.as_ref(),
&mut frame_buf, conn_id,
).await?; d2c_flush_policy.ack_flush_immediate,
false,
).await? {
MeWriterResponseOutcome::Continue { frames, bytes, flush_immediately: immediate } => {
batch_frames = batch_frames.saturating_add(frames);
batch_bytes = batch_bytes.saturating_add(bytes);
flush_immediately = immediate;
}
MeWriterResponseOutcome::Close => {
let _ = writer.flush().await;
return Ok(());
}
}
while !flush_immediately
&& batch_frames < d2c_flush_policy.max_frames
&& batch_bytes < d2c_flush_policy.max_bytes
{
let Ok(next) = me_rx_task.try_recv() else {
break;
};
match process_me_writer_response(
next,
&mut writer,
proto_tag,
rng_clone.as_ref(),
&mut frame_buf,
stats_clone.as_ref(),
&user_clone,
bytes_me2c_clone.as_ref(),
conn_id,
d2c_flush_policy.ack_flush_immediate,
true,
).await? {
MeWriterResponseOutcome::Continue { frames, bytes, flush_immediately: immediate } => {
batch_frames = batch_frames.saturating_add(frames);
batch_bytes = batch_bytes.saturating_add(bytes);
flush_immediately |= immediate;
}
MeWriterResponseOutcome::Close => {
let _ = writer.flush().await;
return Ok(());
}
}
}
if !flush_immediately
&& !d2c_flush_policy.max_delay.is_zero()
&& batch_frames < d2c_flush_policy.max_frames
&& batch_bytes < d2c_flush_policy.max_bytes
{
match tokio::time::timeout(d2c_flush_policy.max_delay, me_rx_task.recv()).await {
Ok(Some(next)) => {
match process_me_writer_response(
next,
&mut writer,
proto_tag,
rng_clone.as_ref(),
&mut frame_buf,
stats_clone.as_ref(),
&user_clone,
bytes_me2c_clone.as_ref(),
conn_id,
d2c_flush_policy.ack_flush_immediate,
true,
).await? {
MeWriterResponseOutcome::Continue { frames, bytes, flush_immediately: immediate } => {
batch_frames = batch_frames.saturating_add(frames);
batch_bytes = batch_bytes.saturating_add(bytes);
flush_immediately |= immediate;
} }
MeResponse::Ack(confirm) => { MeWriterResponseOutcome::Close => {
trace!(conn_id, confirm, "ME->C quickack (batched)");
write_client_ack(&mut writer, proto_tag, confirm).await?;
}
MeResponse::Close => {
debug!(conn_id, "ME sent close (batched)");
let _ = writer.flush().await; let _ = writer.flush().await;
return Ok(()); return Ok(());
} }
} }
}
writer.flush().await.map_err(ProxyError::Io)?; while !flush_immediately
} && batch_frames < d2c_flush_policy.max_frames
Some(MeResponse::Ack(confirm)) => { && batch_bytes < d2c_flush_policy.max_bytes
trace!(conn_id, confirm, "ME->C quickack"); {
write_client_ack(&mut writer, proto_tag, confirm).await?; let Ok(extra) = me_rx_task.try_recv() else {
} break;
Some(MeResponse::Close) => { };
debug!(conn_id, "ME sent close");
let _ = writer.flush().await; match process_me_writer_response(
return Ok(()); extra,
} &mut writer,
None => { proto_tag,
debug!(conn_id, "ME channel closed"); rng_clone.as_ref(),
return Err(ProxyError::Proxy("ME connection lost".into())); &mut frame_buf,
stats_clone.as_ref(),
&user_clone,
bytes_me2c_clone.as_ref(),
conn_id,
d2c_flush_policy.ack_flush_immediate,
true,
).await? {
MeWriterResponseOutcome::Continue { frames, bytes, flush_immediately: immediate } => {
batch_frames = batch_frames.saturating_add(frames);
batch_bytes = batch_bytes.saturating_add(bytes);
flush_immediately |= immediate;
}
MeWriterResponseOutcome::Close => {
let _ = writer.flush().await;
return Ok(());
}
}
}
}
Ok(None) => {
debug!(conn_id, "ME channel closed");
return Err(ProxyError::Proxy("ME connection lost".into()));
}
Err(_) => {}
} }
} }
writer.flush().await.map_err(ProxyError::Io)?;
} }
_ = &mut stop_rx => { _ = &mut stop_rx => {
debug!(conn_id, "ME writer stop signal"); debug!(conn_id, "ME writer stop signal");
@ -587,6 +695,81 @@ where
} }
} }
enum MeWriterResponseOutcome {
Continue {
frames: usize,
bytes: usize,
flush_immediately: bool,
},
Close,
}
async fn process_me_writer_response<W>(
response: MeResponse,
client_writer: &mut CryptoWriter<W>,
proto_tag: ProtoTag,
rng: &SecureRandom,
frame_buf: &mut Vec<u8>,
stats: &Stats,
user: &str,
bytes_me2c: &AtomicU64,
conn_id: u64,
ack_flush_immediate: bool,
batched: bool,
) -> Result<MeWriterResponseOutcome>
where
W: AsyncWrite + Unpin + Send + 'static,
{
match response {
MeResponse::Data { flags, data } => {
if batched {
trace!(conn_id, bytes = data.len(), flags, "ME->C data (batched)");
} else {
trace!(conn_id, bytes = data.len(), flags, "ME->C data");
}
bytes_me2c.fetch_add(data.len() as u64, Ordering::Relaxed);
stats.add_user_octets_to(user, data.len() as u64);
write_client_payload(
client_writer,
proto_tag,
flags,
&data,
rng,
frame_buf,
)
.await?;
Ok(MeWriterResponseOutcome::Continue {
frames: 1,
bytes: data.len(),
flush_immediately: false,
})
}
MeResponse::Ack(confirm) => {
if batched {
trace!(conn_id, confirm, "ME->C quickack (batched)");
} else {
trace!(conn_id, confirm, "ME->C quickack");
}
write_client_ack(client_writer, proto_tag, confirm).await?;
Ok(MeWriterResponseOutcome::Continue {
frames: 1,
bytes: 4,
flush_immediately: ack_flush_immediate,
})
}
MeResponse::Close => {
if batched {
debug!(conn_id, "ME sent close (batched)");
} else {
debug!(conn_id, "ME sent close");
}
Ok(MeWriterResponseOutcome::Close)
}
}
}
async fn write_client_payload<W>( async fn write_client_payload<W>(
client_writer: &mut CryptoWriter<W>, client_writer: &mut CryptoWriter<W>,
proto_tag: ProtoTag, proto_tag: ProtoTag,
@ -696,9 +879,7 @@ where
client_writer client_writer
.write_all(&bytes) .write_all(&bytes)
.await .await
.map_err(ProxyError::Io)?; .map_err(ProxyError::Io)
// ACK should remain low-latency.
client_writer.flush().await.map_err(ProxyError::Io)
} }
#[cfg(test)] #[cfg(test)]

View File

@ -57,7 +57,9 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::Duration; use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf, copy_bidirectional}; use tokio::io::{
AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf, copy_bidirectional_with_sizes,
};
use tokio::time::Instant; use tokio::time::Instant;
use tracing::{debug, trace, warn}; use tracing::{debug, trace, warn};
use crate::error::Result; use crate::error::Result;
@ -296,9 +298,8 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
/// ///
/// ## API compatibility /// ## API compatibility
/// ///
/// Signature is identical to the previous implementation. The `_buffer_pool` /// The `_buffer_pool` parameter is retained for call-site compatibility.
/// parameter is retained for call-site compatibility — `copy_bidirectional` /// Effective relay copy buffers are configured by `c2s_buf_size` / `s2c_buf_size`.
/// manages its own internal buffers (8 KB per direction).
/// ///
/// ## Guarantees preserved /// ## Guarantees preserved
/// ///
@ -312,6 +313,8 @@ pub async fn relay_bidirectional<CR, CW, SR, SW>(
client_writer: CW, client_writer: CW,
server_reader: SR, server_reader: SR,
server_writer: SW, server_writer: SW,
c2s_buf_size: usize,
s2c_buf_size: usize,
user: &str, user: &str,
stats: Arc<Stats>, stats: Arc<Stats>,
_buffer_pool: Arc<BufferPool>, _buffer_pool: Arc<BufferPool>,
@ -402,7 +405,12 @@ where
// When the watchdog fires, select! drops the copy future, // When the watchdog fires, select! drops the copy future,
// releasing the &mut borrows on client and server. // releasing the &mut borrows on client and server.
let copy_result = tokio::select! { let copy_result = tokio::select! {
result = copy_bidirectional(&mut client, &mut server) => Some(result), result = copy_bidirectional_with_sizes(
&mut client,
&mut server,
c2s_buf_size.max(1),
s2c_buf_size.max(1),
) => Some(result),
_ = watchdog => None, // Activity timeout — cancel relay _ = watchdog => None, // Activity timeout — cancel relay
}; };

View File

@ -183,6 +183,7 @@ pub struct MePool {
pub(super) me_writer_pick_mode: AtomicU8, pub(super) me_writer_pick_mode: AtomicU8,
pub(super) me_writer_pick_sample_size: AtomicU8, pub(super) me_writer_pick_sample_size: AtomicU8,
pub(super) me_socks_kdf_policy: AtomicU8, pub(super) me_socks_kdf_policy: AtomicU8,
pub(super) me_reader_route_data_wait_ms: Arc<AtomicU64>,
pub(super) me_route_no_writer_mode: AtomicU8, pub(super) me_route_no_writer_mode: AtomicU8,
pub(super) me_route_no_writer_wait: Duration, pub(super) me_route_no_writer_wait: Duration,
pub(super) me_route_inline_recovery_attempts: u32, pub(super) me_route_inline_recovery_attempts: u32,
@ -287,6 +288,7 @@ impl MePool {
me_route_backpressure_base_timeout_ms: u64, me_route_backpressure_base_timeout_ms: u64,
me_route_backpressure_high_timeout_ms: u64, me_route_backpressure_high_timeout_ms: u64,
me_route_backpressure_high_watermark_pct: u8, me_route_backpressure_high_watermark_pct: u8,
me_reader_route_data_wait_ms: u64,
me_health_interval_ms_unhealthy: u64, me_health_interval_ms_unhealthy: u64,
me_health_interval_ms_healthy: u64, me_health_interval_ms_healthy: u64,
me_warn_rate_limit_ms: u64, me_warn_rate_limit_ms: u64,
@ -460,6 +462,7 @@ impl MePool {
me_writer_pick_mode: AtomicU8::new(me_writer_pick_mode.as_u8()), me_writer_pick_mode: AtomicU8::new(me_writer_pick_mode.as_u8()),
me_writer_pick_sample_size: AtomicU8::new(me_writer_pick_sample_size.clamp(2, 4)), me_writer_pick_sample_size: AtomicU8::new(me_writer_pick_sample_size.clamp(2, 4)),
me_socks_kdf_policy: AtomicU8::new(me_socks_kdf_policy.as_u8()), me_socks_kdf_policy: AtomicU8::new(me_socks_kdf_policy.as_u8()),
me_reader_route_data_wait_ms: Arc::new(AtomicU64::new(me_reader_route_data_wait_ms)),
me_route_no_writer_mode: AtomicU8::new(me_route_no_writer_mode.as_u8()), me_route_no_writer_mode: AtomicU8::new(me_route_no_writer_mode.as_u8()),
me_route_no_writer_wait: Duration::from_millis(me_route_no_writer_wait_ms), me_route_no_writer_wait: Duration::from_millis(me_route_no_writer_wait_ms),
me_route_inline_recovery_attempts, me_route_inline_recovery_attempts,
@ -650,9 +653,12 @@ impl MePool {
route_backpressure_base_timeout_ms: u64, route_backpressure_base_timeout_ms: u64,
route_backpressure_high_timeout_ms: u64, route_backpressure_high_timeout_ms: u64,
route_backpressure_high_watermark_pct: u8, route_backpressure_high_watermark_pct: u8,
reader_route_data_wait_ms: u64,
) { ) {
self.me_socks_kdf_policy self.me_socks_kdf_policy
.store(socks_kdf_policy.as_u8(), Ordering::Relaxed); .store(socks_kdf_policy.as_u8(), Ordering::Relaxed);
self.me_reader_route_data_wait_ms
.store(reader_route_data_wait_ms, Ordering::Relaxed);
self.registry.update_route_backpressure_policy( self.registry.update_route_backpressure_policy(
route_backpressure_base_timeout_ms, route_backpressure_base_timeout_ms,
route_backpressure_high_timeout_ms, route_backpressure_high_timeout_ms,

View File

@ -208,6 +208,7 @@ impl MePool {
let keepalive_jitter_signal = self.me_keepalive_jitter; let keepalive_jitter_signal = self.me_keepalive_jitter;
let cancel_reader_token = cancel.clone(); let cancel_reader_token = cancel.clone();
let cancel_ping_token = cancel_ping.clone(); let cancel_ping_token = cancel_ping.clone();
let reader_route_data_wait_ms = self.me_reader_route_data_wait_ms.clone();
tokio::spawn(async move { tokio::spawn(async move {
let res = reader_loop( let res = reader_loop(
@ -225,6 +226,7 @@ impl MePool {
writer_id, writer_id,
degraded.clone(), degraded.clone(),
rtt_ema_ms_x10.clone(), rtt_ema_ms_x10.clone(),
reader_route_data_wait_ms,
cancel_reader_token.clone(), cancel_reader_token.clone(),
) )
.await; .await;

View File

@ -1,7 +1,7 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::io::ErrorKind; use std::io::ErrorKind;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use std::time::Instant; use std::time::Instant;
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
@ -35,6 +35,7 @@ pub(crate) async fn reader_loop(
_writer_id: u64, _writer_id: u64,
degraded: Arc<AtomicBool>, degraded: Arc<AtomicBool>,
writer_rtt_ema_ms_x10: Arc<AtomicU32>, writer_rtt_ema_ms_x10: Arc<AtomicU32>,
reader_route_data_wait_ms: Arc<AtomicU64>,
cancel: CancellationToken, cancel: CancellationToken,
) -> Result<()> { ) -> Result<()> {
let mut raw = enc_leftover; let mut raw = enc_leftover;
@ -57,17 +58,14 @@ pub(crate) async fn reader_loop(
let blocks = raw.len() / 16 * 16; let blocks = raw.len() / 16 * 16;
if blocks > 0 { if blocks > 0 {
let mut chunk = raw.split_to(blocks);
let mut new_iv = [0u8; 16]; let mut new_iv = [0u8; 16];
new_iv.copy_from_slice(&raw[blocks - 16..blocks]); new_iv.copy_from_slice(&chunk[blocks - 16..blocks]);
let mut chunk = vec![0u8; blocks];
chunk.copy_from_slice(&raw[..blocks]);
AesCbc::new(dk, div) AesCbc::new(dk, div)
.decrypt_in_place(&mut chunk) .decrypt_in_place(&mut chunk[..])
.map_err(|e| ProxyError::Crypto(format!("{e}")))?; .map_err(|e| ProxyError::Crypto(format!("{e}")))?;
div = new_iv; div = new_iv;
dec.extend_from_slice(&chunk); dec.extend_from_slice(&chunk);
let _ = raw.split_to(blocks);
} }
while dec.len() >= 12 { while dec.len() >= 12 {
@ -85,7 +83,7 @@ pub(crate) async fn reader_loop(
break; break;
} }
let frame = dec.split_to(fl); let frame = dec.split_to(fl).freeze();
let pe = fl - 4; let pe = fl - 4;
let ec = u32::from_le_bytes(frame[pe..pe + 4].try_into().unwrap()); let ec = u32::from_le_bytes(frame[pe..pe + 4].try_into().unwrap());
let actual_crc = rpc_crc(crc_mode, &frame[..pe]); let actual_crc = rpc_crc(crc_mode, &frame[..pe]);
@ -111,21 +109,27 @@ pub(crate) async fn reader_loop(
} }
expected_seq = expected_seq.wrapping_add(1); expected_seq = expected_seq.wrapping_add(1);
let payload = &frame[8..pe]; let payload = frame.slice(8..pe);
if payload.len() < 4 { if payload.len() < 4 {
continue; continue;
} }
let pt = u32::from_le_bytes(payload[0..4].try_into().unwrap()); let pt = u32::from_le_bytes(payload[0..4].try_into().unwrap());
let body = &payload[4..]; let body = payload.slice(4..);
if pt == RPC_PROXY_ANS_U32 && body.len() >= 12 { if pt == RPC_PROXY_ANS_U32 && body.len() >= 12 {
let flags = u32::from_le_bytes(body[0..4].try_into().unwrap()); let flags = u32::from_le_bytes(body[0..4].try_into().unwrap());
let cid = u64::from_le_bytes(body[4..12].try_into().unwrap()); let cid = u64::from_le_bytes(body[4..12].try_into().unwrap());
let data = Bytes::copy_from_slice(&body[12..]); let data = body.slice(12..);
trace!(cid, flags, len = data.len(), "RPC_PROXY_ANS"); trace!(cid, flags, len = data.len(), "RPC_PROXY_ANS");
let routed = reg.route_nowait(cid, MeResponse::Data { flags, data }).await; let data_wait_ms = reader_route_data_wait_ms.load(Ordering::Relaxed);
let routed = if data_wait_ms == 0 {
reg.route_nowait(cid, MeResponse::Data { flags, data }).await
} else {
reg.route_with_timeout(cid, MeResponse::Data { flags, data }, data_wait_ms)
.await
};
if !matches!(routed, RouteResult::Routed) { if !matches!(routed, RouteResult::Routed) {
match routed { match routed {
RouteResult::NoConn => stats.increment_me_route_drop_no_conn(), RouteResult::NoConn => stats.increment_me_route_drop_no_conn(),

View File

@ -231,6 +231,57 @@ impl ConnRegistry {
} }
} }
pub async fn route_with_timeout(
&self,
id: u64,
resp: MeResponse,
timeout_ms: u64,
) -> RouteResult {
if timeout_ms == 0 {
return self.route_nowait(id, resp).await;
}
let tx = {
let inner = self.inner.read().await;
inner.map.get(&id).cloned()
};
let Some(tx) = tx else {
return RouteResult::NoConn;
};
match tx.try_send(resp) {
Ok(()) => RouteResult::Routed,
Err(TrySendError::Closed(_)) => RouteResult::ChannelClosed,
Err(TrySendError::Full(resp)) => {
let high_watermark_pct = self
.route_backpressure_high_watermark_pct
.load(Ordering::Relaxed)
.clamp(1, 100);
let used = self.route_channel_capacity.saturating_sub(tx.capacity());
let used_pct = if self.route_channel_capacity == 0 {
100
} else {
(used.saturating_mul(100) / self.route_channel_capacity) as u8
};
let high_profile = used_pct >= high_watermark_pct;
let timeout_dur = Duration::from_millis(timeout_ms.max(1));
match tokio::time::timeout(timeout_dur, tx.send(resp)).await {
Ok(Ok(())) => RouteResult::Routed,
Ok(Err(_)) => RouteResult::ChannelClosed,
Err(_) => {
if high_profile {
RouteResult::QueueFullHigh
} else {
RouteResult::QueueFullBase
}
}
}
}
}
}
pub async fn bind_writer( pub async fn bind_writer(
&self, &self,
conn_id: u64, conn_id: u64,