mirror of https://github.com/telemt/telemt.git
DC -> Client Optimizations
This commit is contained in:
parent
cf82b637d2
commit
0461bc65c6
|
|
@ -29,6 +29,8 @@ const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_FRAMES: usize = 32;
|
||||||
const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_BYTES: usize = 128 * 1024;
|
const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_BYTES: usize = 128 * 1024;
|
||||||
const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_DELAY_US: u64 = 500;
|
const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_DELAY_US: u64 = 500;
|
||||||
const DEFAULT_ME_D2C_ACK_FLUSH_IMMEDIATE: bool = true;
|
const DEFAULT_ME_D2C_ACK_FLUSH_IMMEDIATE: bool = true;
|
||||||
|
const DEFAULT_ME_QUOTA_SOFT_OVERSHOOT_BYTES: u64 = 64 * 1024;
|
||||||
|
const DEFAULT_ME_D2C_FRAME_BUF_SHRINK_THRESHOLD_BYTES: usize = 256 * 1024;
|
||||||
const DEFAULT_DIRECT_RELAY_COPY_BUF_C2S_BYTES: usize = 64 * 1024;
|
const DEFAULT_DIRECT_RELAY_COPY_BUF_C2S_BYTES: usize = 64 * 1024;
|
||||||
const DEFAULT_DIRECT_RELAY_COPY_BUF_S2C_BYTES: usize = 256 * 1024;
|
const DEFAULT_DIRECT_RELAY_COPY_BUF_S2C_BYTES: usize = 256 * 1024;
|
||||||
const DEFAULT_ME_WRITER_PICK_SAMPLE_SIZE: u8 = 3;
|
const DEFAULT_ME_WRITER_PICK_SAMPLE_SIZE: u8 = 3;
|
||||||
|
|
@ -387,6 +389,14 @@ pub(crate) fn default_me_d2c_ack_flush_immediate() -> bool {
|
||||||
DEFAULT_ME_D2C_ACK_FLUSH_IMMEDIATE
|
DEFAULT_ME_D2C_ACK_FLUSH_IMMEDIATE
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_me_quota_soft_overshoot_bytes() -> u64 {
|
||||||
|
DEFAULT_ME_QUOTA_SOFT_OVERSHOOT_BYTES
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_me_d2c_frame_buf_shrink_threshold_bytes() -> usize {
|
||||||
|
DEFAULT_ME_D2C_FRAME_BUF_SHRINK_THRESHOLD_BYTES
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn default_direct_relay_copy_buf_c2s_bytes() -> usize {
|
pub(crate) fn default_direct_relay_copy_buf_c2s_bytes() -> usize {
|
||||||
DEFAULT_DIRECT_RELAY_COPY_BUF_C2S_BYTES
|
DEFAULT_DIRECT_RELAY_COPY_BUF_C2S_BYTES
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -106,6 +106,8 @@ pub struct HotFields {
|
||||||
pub me_d2c_flush_batch_max_bytes: usize,
|
pub me_d2c_flush_batch_max_bytes: usize,
|
||||||
pub me_d2c_flush_batch_max_delay_us: u64,
|
pub me_d2c_flush_batch_max_delay_us: u64,
|
||||||
pub me_d2c_ack_flush_immediate: bool,
|
pub me_d2c_ack_flush_immediate: bool,
|
||||||
|
pub me_quota_soft_overshoot_bytes: u64,
|
||||||
|
pub me_d2c_frame_buf_shrink_threshold_bytes: usize,
|
||||||
pub direct_relay_copy_buf_c2s_bytes: usize,
|
pub direct_relay_copy_buf_c2s_bytes: usize,
|
||||||
pub direct_relay_copy_buf_s2c_bytes: usize,
|
pub direct_relay_copy_buf_s2c_bytes: usize,
|
||||||
pub me_health_interval_ms_unhealthy: u64,
|
pub me_health_interval_ms_unhealthy: u64,
|
||||||
|
|
@ -225,6 +227,8 @@ impl HotFields {
|
||||||
me_d2c_flush_batch_max_bytes: cfg.general.me_d2c_flush_batch_max_bytes,
|
me_d2c_flush_batch_max_bytes: cfg.general.me_d2c_flush_batch_max_bytes,
|
||||||
me_d2c_flush_batch_max_delay_us: cfg.general.me_d2c_flush_batch_max_delay_us,
|
me_d2c_flush_batch_max_delay_us: cfg.general.me_d2c_flush_batch_max_delay_us,
|
||||||
me_d2c_ack_flush_immediate: cfg.general.me_d2c_ack_flush_immediate,
|
me_d2c_ack_flush_immediate: cfg.general.me_d2c_ack_flush_immediate,
|
||||||
|
me_quota_soft_overshoot_bytes: cfg.general.me_quota_soft_overshoot_bytes,
|
||||||
|
me_d2c_frame_buf_shrink_threshold_bytes: cfg.general.me_d2c_frame_buf_shrink_threshold_bytes,
|
||||||
direct_relay_copy_buf_c2s_bytes: cfg.general.direct_relay_copy_buf_c2s_bytes,
|
direct_relay_copy_buf_c2s_bytes: cfg.general.direct_relay_copy_buf_c2s_bytes,
|
||||||
direct_relay_copy_buf_s2c_bytes: cfg.general.direct_relay_copy_buf_s2c_bytes,
|
direct_relay_copy_buf_s2c_bytes: cfg.general.direct_relay_copy_buf_s2c_bytes,
|
||||||
me_health_interval_ms_unhealthy: cfg.general.me_health_interval_ms_unhealthy,
|
me_health_interval_ms_unhealthy: cfg.general.me_health_interval_ms_unhealthy,
|
||||||
|
|
@ -511,6 +515,9 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig {
|
||||||
cfg.general.me_d2c_flush_batch_max_bytes = new.general.me_d2c_flush_batch_max_bytes;
|
cfg.general.me_d2c_flush_batch_max_bytes = new.general.me_d2c_flush_batch_max_bytes;
|
||||||
cfg.general.me_d2c_flush_batch_max_delay_us = new.general.me_d2c_flush_batch_max_delay_us;
|
cfg.general.me_d2c_flush_batch_max_delay_us = new.general.me_d2c_flush_batch_max_delay_us;
|
||||||
cfg.general.me_d2c_ack_flush_immediate = new.general.me_d2c_ack_flush_immediate;
|
cfg.general.me_d2c_ack_flush_immediate = new.general.me_d2c_ack_flush_immediate;
|
||||||
|
cfg.general.me_quota_soft_overshoot_bytes = new.general.me_quota_soft_overshoot_bytes;
|
||||||
|
cfg.general.me_d2c_frame_buf_shrink_threshold_bytes =
|
||||||
|
new.general.me_d2c_frame_buf_shrink_threshold_bytes;
|
||||||
cfg.general.direct_relay_copy_buf_c2s_bytes = new.general.direct_relay_copy_buf_c2s_bytes;
|
cfg.general.direct_relay_copy_buf_c2s_bytes = new.general.direct_relay_copy_buf_c2s_bytes;
|
||||||
cfg.general.direct_relay_copy_buf_s2c_bytes = new.general.direct_relay_copy_buf_s2c_bytes;
|
cfg.general.direct_relay_copy_buf_s2c_bytes = new.general.direct_relay_copy_buf_s2c_bytes;
|
||||||
cfg.general.me_health_interval_ms_unhealthy = new.general.me_health_interval_ms_unhealthy;
|
cfg.general.me_health_interval_ms_unhealthy = new.general.me_health_interval_ms_unhealthy;
|
||||||
|
|
@ -1030,15 +1037,20 @@ fn log_changes(
|
||||||
|| old_hot.me_d2c_flush_batch_max_bytes != new_hot.me_d2c_flush_batch_max_bytes
|
|| old_hot.me_d2c_flush_batch_max_bytes != new_hot.me_d2c_flush_batch_max_bytes
|
||||||
|| old_hot.me_d2c_flush_batch_max_delay_us != new_hot.me_d2c_flush_batch_max_delay_us
|
|| old_hot.me_d2c_flush_batch_max_delay_us != new_hot.me_d2c_flush_batch_max_delay_us
|
||||||
|| old_hot.me_d2c_ack_flush_immediate != new_hot.me_d2c_ack_flush_immediate
|
|| old_hot.me_d2c_ack_flush_immediate != new_hot.me_d2c_ack_flush_immediate
|
||||||
|
|| old_hot.me_quota_soft_overshoot_bytes != new_hot.me_quota_soft_overshoot_bytes
|
||||||
|
|| old_hot.me_d2c_frame_buf_shrink_threshold_bytes
|
||||||
|
!= new_hot.me_d2c_frame_buf_shrink_threshold_bytes
|
||||||
|| old_hot.direct_relay_copy_buf_c2s_bytes != new_hot.direct_relay_copy_buf_c2s_bytes
|
|| old_hot.direct_relay_copy_buf_c2s_bytes != new_hot.direct_relay_copy_buf_c2s_bytes
|
||||||
|| old_hot.direct_relay_copy_buf_s2c_bytes != new_hot.direct_relay_copy_buf_s2c_bytes
|
|| old_hot.direct_relay_copy_buf_s2c_bytes != new_hot.direct_relay_copy_buf_s2c_bytes
|
||||||
{
|
{
|
||||||
info!(
|
info!(
|
||||||
"config reload: relay_tuning: me_d2c_frames={} me_d2c_bytes={} me_d2c_delay_us={} me_ack_flush_immediate={} direct_buf_c2s={} direct_buf_s2c={}",
|
"config reload: relay_tuning: me_d2c_frames={} me_d2c_bytes={} me_d2c_delay_us={} me_ack_flush_immediate={} me_quota_soft_overshoot_bytes={} me_d2c_frame_buf_shrink_threshold_bytes={} direct_buf_c2s={} direct_buf_s2c={}",
|
||||||
new_hot.me_d2c_flush_batch_max_frames,
|
new_hot.me_d2c_flush_batch_max_frames,
|
||||||
new_hot.me_d2c_flush_batch_max_bytes,
|
new_hot.me_d2c_flush_batch_max_bytes,
|
||||||
new_hot.me_d2c_flush_batch_max_delay_us,
|
new_hot.me_d2c_flush_batch_max_delay_us,
|
||||||
new_hot.me_d2c_ack_flush_immediate,
|
new_hot.me_d2c_ack_flush_immediate,
|
||||||
|
new_hot.me_quota_soft_overshoot_bytes,
|
||||||
|
new_hot.me_d2c_frame_buf_shrink_threshold_bytes,
|
||||||
new_hot.direct_relay_copy_buf_c2s_bytes,
|
new_hot.direct_relay_copy_buf_c2s_bytes,
|
||||||
new_hot.direct_relay_copy_buf_s2c_bytes,
|
new_hot.direct_relay_copy_buf_s2c_bytes,
|
||||||
);
|
);
|
||||||
|
|
|
||||||
|
|
@ -533,6 +533,19 @@ impl ProxyConfig {
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if config.general.me_quota_soft_overshoot_bytes > 16 * 1024 * 1024 {
|
||||||
|
return Err(ProxyError::Config(
|
||||||
|
"general.me_quota_soft_overshoot_bytes must be within [0, 16777216]".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
if !(4096..=16 * 1024 * 1024).contains(&config.general.me_d2c_frame_buf_shrink_threshold_bytes) {
|
||||||
|
return Err(ProxyError::Config(
|
||||||
|
"general.me_d2c_frame_buf_shrink_threshold_bytes must be within [4096, 16777216]"
|
||||||
|
.to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
if !(4096..=1024 * 1024).contains(&config.general.direct_relay_copy_buf_c2s_bytes) {
|
if !(4096..=1024 * 1024).contains(&config.general.direct_relay_copy_buf_c2s_bytes) {
|
||||||
return Err(ProxyError::Config(
|
return Err(ProxyError::Config(
|
||||||
"general.direct_relay_copy_buf_c2s_bytes must be within [4096, 1048576]"
|
"general.direct_relay_copy_buf_c2s_bytes must be within [4096, 1048576]"
|
||||||
|
|
|
||||||
|
|
@ -468,7 +468,7 @@ pub struct GeneralConfig {
|
||||||
pub me_c2me_send_timeout_ms: u64,
|
pub me_c2me_send_timeout_ms: u64,
|
||||||
|
|
||||||
/// Bounded wait in milliseconds for routing ME DATA to per-connection queue.
|
/// Bounded wait in milliseconds for routing ME DATA to per-connection queue.
|
||||||
/// `0` keeps legacy no-wait behavior.
|
/// `0` keeps non-blocking routing; values >0 enable bounded wait for compatibility.
|
||||||
#[serde(default = "default_me_reader_route_data_wait_ms")]
|
#[serde(default = "default_me_reader_route_data_wait_ms")]
|
||||||
pub me_reader_route_data_wait_ms: u64,
|
pub me_reader_route_data_wait_ms: u64,
|
||||||
|
|
||||||
|
|
@ -489,6 +489,14 @@ pub struct GeneralConfig {
|
||||||
#[serde(default = "default_me_d2c_ack_flush_immediate")]
|
#[serde(default = "default_me_d2c_ack_flush_immediate")]
|
||||||
pub me_d2c_ack_flush_immediate: bool,
|
pub me_d2c_ack_flush_immediate: bool,
|
||||||
|
|
||||||
|
/// Additional bytes above strict per-user quota allowed in hot-path soft mode.
|
||||||
|
#[serde(default = "default_me_quota_soft_overshoot_bytes")]
|
||||||
|
pub me_quota_soft_overshoot_bytes: u64,
|
||||||
|
|
||||||
|
/// Shrink threshold for reusable ME->Client frame assembly buffer.
|
||||||
|
#[serde(default = "default_me_d2c_frame_buf_shrink_threshold_bytes")]
|
||||||
|
pub me_d2c_frame_buf_shrink_threshold_bytes: usize,
|
||||||
|
|
||||||
/// Copy buffer size for client->DC direction in direct relay.
|
/// Copy buffer size for client->DC direction in direct relay.
|
||||||
#[serde(default = "default_direct_relay_copy_buf_c2s_bytes")]
|
#[serde(default = "default_direct_relay_copy_buf_c2s_bytes")]
|
||||||
pub direct_relay_copy_buf_c2s_bytes: usize,
|
pub direct_relay_copy_buf_c2s_bytes: usize,
|
||||||
|
|
@ -945,6 +953,8 @@ impl Default for GeneralConfig {
|
||||||
me_d2c_flush_batch_max_bytes: default_me_d2c_flush_batch_max_bytes(),
|
me_d2c_flush_batch_max_bytes: default_me_d2c_flush_batch_max_bytes(),
|
||||||
me_d2c_flush_batch_max_delay_us: default_me_d2c_flush_batch_max_delay_us(),
|
me_d2c_flush_batch_max_delay_us: default_me_d2c_flush_batch_max_delay_us(),
|
||||||
me_d2c_ack_flush_immediate: default_me_d2c_ack_flush_immediate(),
|
me_d2c_ack_flush_immediate: default_me_d2c_ack_flush_immediate(),
|
||||||
|
me_quota_soft_overshoot_bytes: default_me_quota_soft_overshoot_bytes(),
|
||||||
|
me_d2c_frame_buf_shrink_threshold_bytes: default_me_d2c_frame_buf_shrink_threshold_bytes(),
|
||||||
direct_relay_copy_buf_c2s_bytes: default_direct_relay_copy_buf_c2s_bytes(),
|
direct_relay_copy_buf_c2s_bytes: default_direct_relay_copy_buf_c2s_bytes(),
|
||||||
direct_relay_copy_buf_s2c_bytes: default_direct_relay_copy_buf_s2c_bytes(),
|
direct_relay_copy_buf_s2c_bytes: default_direct_relay_copy_buf_s2c_bytes(),
|
||||||
me_warmup_stagger_enabled: default_true(),
|
me_warmup_stagger_enabled: default_true(),
|
||||||
|
|
|
||||||
|
|
@ -45,6 +45,8 @@ const C2ME_SEND_TIMEOUT: Duration = Duration::from_millis(50);
|
||||||
const C2ME_SEND_TIMEOUT: Duration = Duration::from_secs(5);
|
const C2ME_SEND_TIMEOUT: Duration = Duration::from_secs(5);
|
||||||
const ME_D2C_FLUSH_BATCH_MAX_FRAMES_MIN: usize = 1;
|
const ME_D2C_FLUSH_BATCH_MAX_FRAMES_MIN: usize = 1;
|
||||||
const ME_D2C_FLUSH_BATCH_MAX_BYTES_MIN: usize = 4096;
|
const ME_D2C_FLUSH_BATCH_MAX_BYTES_MIN: usize = 4096;
|
||||||
|
const ME_D2C_FRAME_BUF_SHRINK_HYSTERESIS_FACTOR: usize = 2;
|
||||||
|
const ME_D2C_SINGLE_WRITE_COALESCE_MAX_BYTES: usize = 128 * 1024;
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
const QUOTA_USER_LOCKS_MAX: usize = 64;
|
const QUOTA_USER_LOCKS_MAX: usize = 64;
|
||||||
#[cfg(not(test))]
|
#[cfg(not(test))]
|
||||||
|
|
@ -214,6 +216,8 @@ struct MeD2cFlushPolicy {
|
||||||
max_bytes: usize,
|
max_bytes: usize,
|
||||||
max_delay: Duration,
|
max_delay: Duration,
|
||||||
ack_flush_immediate: bool,
|
ack_flush_immediate: bool,
|
||||||
|
quota_soft_overshoot_bytes: u64,
|
||||||
|
frame_buf_shrink_threshold_bytes: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Copy)]
|
#[derive(Clone, Copy)]
|
||||||
|
|
@ -284,6 +288,11 @@ impl MeD2cFlushPolicy {
|
||||||
.max(ME_D2C_FLUSH_BATCH_MAX_BYTES_MIN),
|
.max(ME_D2C_FLUSH_BATCH_MAX_BYTES_MIN),
|
||||||
max_delay: Duration::from_micros(config.general.me_d2c_flush_batch_max_delay_us),
|
max_delay: Duration::from_micros(config.general.me_d2c_flush_batch_max_delay_us),
|
||||||
ack_flush_immediate: config.general.me_d2c_ack_flush_immediate,
|
ack_flush_immediate: config.general.me_d2c_ack_flush_immediate,
|
||||||
|
quota_soft_overshoot_bytes: config.general.me_quota_soft_overshoot_bytes,
|
||||||
|
frame_buf_shrink_threshold_bytes: config
|
||||||
|
.general
|
||||||
|
.me_d2c_frame_buf_shrink_threshold_bytes
|
||||||
|
.max(4096),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -538,6 +547,33 @@ fn quota_would_be_exceeded_for_user(
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn quota_soft_cap(limit: u64, overshoot: u64) -> u64 {
|
||||||
|
limit.saturating_add(overshoot)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn quota_exceeded_for_user_soft(
|
||||||
|
stats: &Stats,
|
||||||
|
user: &str,
|
||||||
|
quota_limit: Option<u64>,
|
||||||
|
overshoot: u64,
|
||||||
|
) -> bool {
|
||||||
|
quota_limit.is_some_and(|quota| stats.get_user_total_octets(user) >= quota_soft_cap(quota, overshoot))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn quota_would_be_exceeded_for_user_soft(
|
||||||
|
stats: &Stats,
|
||||||
|
user: &str,
|
||||||
|
quota_limit: Option<u64>,
|
||||||
|
bytes: u64,
|
||||||
|
overshoot: u64,
|
||||||
|
) -> bool {
|
||||||
|
quota_limit.is_some_and(|quota| {
|
||||||
|
let cap = quota_soft_cap(quota, overshoot);
|
||||||
|
let used = stats.get_user_total_octets(user);
|
||||||
|
used >= cap || bytes > cap.saturating_sub(used)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
fn quota_user_lock_test_guard() -> &'static Mutex<()> {
|
fn quota_user_lock_test_guard() -> &'static Mutex<()> {
|
||||||
static TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
|
static TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
|
||||||
|
|
@ -786,6 +822,7 @@ where
|
||||||
stats_clone.as_ref(),
|
stats_clone.as_ref(),
|
||||||
&user_clone,
|
&user_clone,
|
||||||
quota_limit,
|
quota_limit,
|
||||||
|
d2c_flush_policy.quota_soft_overshoot_bytes,
|
||||||
bytes_me2c_clone.as_ref(),
|
bytes_me2c_clone.as_ref(),
|
||||||
conn_id,
|
conn_id,
|
||||||
d2c_flush_policy.ack_flush_immediate,
|
d2c_flush_policy.ack_flush_immediate,
|
||||||
|
|
@ -825,6 +862,7 @@ where
|
||||||
stats_clone.as_ref(),
|
stats_clone.as_ref(),
|
||||||
&user_clone,
|
&user_clone,
|
||||||
quota_limit,
|
quota_limit,
|
||||||
|
d2c_flush_policy.quota_soft_overshoot_bytes,
|
||||||
bytes_me2c_clone.as_ref(),
|
bytes_me2c_clone.as_ref(),
|
||||||
conn_id,
|
conn_id,
|
||||||
d2c_flush_policy.ack_flush_immediate,
|
d2c_flush_policy.ack_flush_immediate,
|
||||||
|
|
@ -864,6 +902,7 @@ where
|
||||||
stats_clone.as_ref(),
|
stats_clone.as_ref(),
|
||||||
&user_clone,
|
&user_clone,
|
||||||
quota_limit,
|
quota_limit,
|
||||||
|
d2c_flush_policy.quota_soft_overshoot_bytes,
|
||||||
bytes_me2c_clone.as_ref(),
|
bytes_me2c_clone.as_ref(),
|
||||||
conn_id,
|
conn_id,
|
||||||
d2c_flush_policy.ack_flush_immediate,
|
d2c_flush_policy.ack_flush_immediate,
|
||||||
|
|
@ -903,6 +942,7 @@ where
|
||||||
stats_clone.as_ref(),
|
stats_clone.as_ref(),
|
||||||
&user_clone,
|
&user_clone,
|
||||||
quota_limit,
|
quota_limit,
|
||||||
|
d2c_flush_policy.quota_soft_overshoot_bytes,
|
||||||
bytes_me2c_clone.as_ref(),
|
bytes_me2c_clone.as_ref(),
|
||||||
conn_id,
|
conn_id,
|
||||||
d2c_flush_policy.ack_flush_immediate,
|
d2c_flush_policy.ack_flush_immediate,
|
||||||
|
|
@ -933,6 +973,12 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
writer.flush().await.map_err(ProxyError::Io)?;
|
writer.flush().await.map_err(ProxyError::Io)?;
|
||||||
|
let shrink_threshold = d2c_flush_policy.frame_buf_shrink_threshold_bytes;
|
||||||
|
let shrink_trigger = shrink_threshold
|
||||||
|
.saturating_mul(ME_D2C_FRAME_BUF_SHRINK_HYSTERESIS_FACTOR);
|
||||||
|
if frame_buf.capacity() > shrink_trigger {
|
||||||
|
frame_buf.shrink_to(shrink_threshold);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
_ = &mut stop_rx => {
|
_ = &mut stop_rx => {
|
||||||
debug!(conn_id, "ME writer stop signal");
|
debug!(conn_id, "ME writer stop signal");
|
||||||
|
|
@ -1482,6 +1528,7 @@ async fn process_me_writer_response<W>(
|
||||||
stats: &Stats,
|
stats: &Stats,
|
||||||
user: &str,
|
user: &str,
|
||||||
quota_limit: Option<u64>,
|
quota_limit: Option<u64>,
|
||||||
|
quota_soft_overshoot_bytes: u64,
|
||||||
bytes_me2c: &AtomicU64,
|
bytes_me2c: &AtomicU64,
|
||||||
conn_id: u64,
|
conn_id: u64,
|
||||||
ack_flush_immediate: bool,
|
ack_flush_immediate: bool,
|
||||||
|
|
@ -1498,31 +1545,32 @@ where
|
||||||
trace!(conn_id, bytes = data.len(), flags, "ME->C data");
|
trace!(conn_id, bytes = data.len(), flags, "ME->C data");
|
||||||
}
|
}
|
||||||
let data_len = data.len() as u64;
|
let data_len = data.len() as u64;
|
||||||
if let Some(limit) = quota_limit {
|
if quota_would_be_exceeded_for_user_soft(
|
||||||
let quota_lock = quota_user_lock(user);
|
stats,
|
||||||
let _quota_guard = quota_lock.lock().await;
|
user,
|
||||||
if quota_would_be_exceeded_for_user(stats, user, Some(limit), data_len) {
|
quota_limit,
|
||||||
return Err(ProxyError::DataQuotaExceeded {
|
data_len,
|
||||||
user: user.to_string(),
|
quota_soft_overshoot_bytes,
|
||||||
});
|
) {
|
||||||
}
|
return Err(ProxyError::DataQuotaExceeded {
|
||||||
write_client_payload(client_writer, proto_tag, flags, &data, rng, frame_buf)
|
user: user.to_string(),
|
||||||
.await?;
|
});
|
||||||
|
}
|
||||||
|
|
||||||
bytes_me2c.fetch_add(data.len() as u64, Ordering::Relaxed);
|
write_client_payload(client_writer, proto_tag, flags, &data, rng, frame_buf).await?;
|
||||||
stats.add_user_octets_to(user, data.len() as u64);
|
|
||||||
|
|
||||||
if quota_exceeded_for_user(stats, user, Some(limit)) {
|
bytes_me2c.fetch_add(data.len() as u64, Ordering::Relaxed);
|
||||||
return Err(ProxyError::DataQuotaExceeded {
|
stats.add_user_octets_to(user, data.len() as u64);
|
||||||
user: user.to_string(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
write_client_payload(client_writer, proto_tag, flags, &data, rng, frame_buf)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
bytes_me2c.fetch_add(data.len() as u64, Ordering::Relaxed);
|
if quota_exceeded_for_user_soft(
|
||||||
stats.add_user_octets_to(user, data.len() as u64);
|
stats,
|
||||||
|
user,
|
||||||
|
quota_limit,
|
||||||
|
quota_soft_overshoot_bytes,
|
||||||
|
) {
|
||||||
|
return Err(ProxyError::DataQuotaExceeded {
|
||||||
|
user: user.to_string(),
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(MeWriterResponseOutcome::Continue {
|
Ok(MeWriterResponseOutcome::Continue {
|
||||||
|
|
@ -1609,28 +1657,42 @@ where
|
||||||
if quickack {
|
if quickack {
|
||||||
first |= 0x80;
|
first |= 0x80;
|
||||||
}
|
}
|
||||||
frame_buf.clear();
|
let wire_len = 1usize.saturating_add(data.len());
|
||||||
frame_buf.reserve(1 + data.len());
|
if wire_len <= ME_D2C_SINGLE_WRITE_COALESCE_MAX_BYTES {
|
||||||
frame_buf.push(first);
|
frame_buf.clear();
|
||||||
frame_buf.extend_from_slice(data);
|
frame_buf.reserve(wire_len);
|
||||||
client_writer
|
frame_buf.push(first);
|
||||||
.write_all(frame_buf)
|
frame_buf.extend_from_slice(data);
|
||||||
.await
|
client_writer
|
||||||
.map_err(ProxyError::Io)?;
|
.write_all(frame_buf.as_slice())
|
||||||
|
.await
|
||||||
|
.map_err(ProxyError::Io)?;
|
||||||
|
} else {
|
||||||
|
let header = [first];
|
||||||
|
client_writer.write_all(&header).await.map_err(ProxyError::Io)?;
|
||||||
|
client_writer.write_all(data).await.map_err(ProxyError::Io)?;
|
||||||
|
}
|
||||||
} else if len_words < (1 << 24) {
|
} else if len_words < (1 << 24) {
|
||||||
let mut first = 0x7fu8;
|
let mut first = 0x7fu8;
|
||||||
if quickack {
|
if quickack {
|
||||||
first |= 0x80;
|
first |= 0x80;
|
||||||
}
|
}
|
||||||
let lw = (len_words as u32).to_le_bytes();
|
let lw = (len_words as u32).to_le_bytes();
|
||||||
frame_buf.clear();
|
let wire_len = 4usize.saturating_add(data.len());
|
||||||
frame_buf.reserve(4 + data.len());
|
if wire_len <= ME_D2C_SINGLE_WRITE_COALESCE_MAX_BYTES {
|
||||||
frame_buf.extend_from_slice(&[first, lw[0], lw[1], lw[2]]);
|
frame_buf.clear();
|
||||||
frame_buf.extend_from_slice(data);
|
frame_buf.reserve(wire_len);
|
||||||
client_writer
|
frame_buf.extend_from_slice(&[first, lw[0], lw[1], lw[2]]);
|
||||||
.write_all(frame_buf)
|
frame_buf.extend_from_slice(data);
|
||||||
.await
|
client_writer
|
||||||
.map_err(ProxyError::Io)?;
|
.write_all(frame_buf.as_slice())
|
||||||
|
.await
|
||||||
|
.map_err(ProxyError::Io)?;
|
||||||
|
} else {
|
||||||
|
let header = [first, lw[0], lw[1], lw[2]];
|
||||||
|
client_writer.write_all(&header).await.map_err(ProxyError::Io)?;
|
||||||
|
client_writer.write_all(data).await.map_err(ProxyError::Io)?;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
return Err(ProxyError::Proxy(format!(
|
return Err(ProxyError::Proxy(format!(
|
||||||
"Abridged frame too large: {}",
|
"Abridged frame too large: {}",
|
||||||
|
|
@ -1650,21 +1712,40 @@ where
|
||||||
} else {
|
} else {
|
||||||
0
|
0
|
||||||
};
|
};
|
||||||
|
|
||||||
let (len_val, total) =
|
let (len_val, total) =
|
||||||
compute_intermediate_secure_wire_len(data.len(), padding_len, quickack)?;
|
compute_intermediate_secure_wire_len(data.len(), padding_len, quickack)?;
|
||||||
frame_buf.clear();
|
if total <= ME_D2C_SINGLE_WRITE_COALESCE_MAX_BYTES {
|
||||||
frame_buf.reserve(total);
|
frame_buf.clear();
|
||||||
frame_buf.extend_from_slice(&len_val.to_le_bytes());
|
frame_buf.reserve(total);
|
||||||
frame_buf.extend_from_slice(data);
|
frame_buf.extend_from_slice(&len_val.to_le_bytes());
|
||||||
if padding_len > 0 {
|
frame_buf.extend_from_slice(data);
|
||||||
let start = frame_buf.len();
|
if padding_len > 0 {
|
||||||
frame_buf.resize(start + padding_len, 0);
|
let start = frame_buf.len();
|
||||||
rng.fill(&mut frame_buf[start..]);
|
frame_buf.resize(start + padding_len, 0);
|
||||||
|
rng.fill(&mut frame_buf[start..]);
|
||||||
|
}
|
||||||
|
client_writer
|
||||||
|
.write_all(frame_buf.as_slice())
|
||||||
|
.await
|
||||||
|
.map_err(ProxyError::Io)?;
|
||||||
|
} else {
|
||||||
|
let header = len_val.to_le_bytes();
|
||||||
|
client_writer.write_all(&header).await.map_err(ProxyError::Io)?;
|
||||||
|
client_writer.write_all(data).await.map_err(ProxyError::Io)?;
|
||||||
|
if padding_len > 0 {
|
||||||
|
frame_buf.clear();
|
||||||
|
if frame_buf.capacity() < padding_len {
|
||||||
|
frame_buf.reserve(padding_len);
|
||||||
|
}
|
||||||
|
frame_buf.resize(padding_len, 0);
|
||||||
|
rng.fill(frame_buf.as_mut_slice());
|
||||||
|
client_writer
|
||||||
|
.write_all(frame_buf.as_slice())
|
||||||
|
.await
|
||||||
|
.map_err(ProxyError::Io)?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
client_writer
|
|
||||||
.write_all(frame_buf)
|
|
||||||
.await
|
|
||||||
.map_err(ProxyError::Io)?;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1540,6 +1540,7 @@ async fn process_me_writer_response_ack_obeys_flush_policy() {
|
||||||
&stats,
|
&stats,
|
||||||
"user",
|
"user",
|
||||||
None,
|
None,
|
||||||
|
0,
|
||||||
&bytes_me2c,
|
&bytes_me2c,
|
||||||
77,
|
77,
|
||||||
true,
|
true,
|
||||||
|
|
@ -1566,6 +1567,7 @@ async fn process_me_writer_response_ack_obeys_flush_policy() {
|
||||||
&stats,
|
&stats,
|
||||||
"user",
|
"user",
|
||||||
None,
|
None,
|
||||||
|
0,
|
||||||
&bytes_me2c,
|
&bytes_me2c,
|
||||||
77,
|
77,
|
||||||
false,
|
false,
|
||||||
|
|
@ -1606,6 +1608,7 @@ async fn process_me_writer_response_data_updates_byte_accounting() {
|
||||||
&stats,
|
&stats,
|
||||||
"user",
|
"user",
|
||||||
None,
|
None,
|
||||||
|
0,
|
||||||
&bytes_me2c,
|
&bytes_me2c,
|
||||||
88,
|
88,
|
||||||
false,
|
false,
|
||||||
|
|
@ -1652,6 +1655,7 @@ async fn process_me_writer_response_data_enforces_live_user_quota() {
|
||||||
&stats,
|
&stats,
|
||||||
"quota-user",
|
"quota-user",
|
||||||
Some(12),
|
Some(12),
|
||||||
|
0,
|
||||||
&bytes_me2c,
|
&bytes_me2c,
|
||||||
89,
|
89,
|
||||||
false,
|
false,
|
||||||
|
|
@ -1700,6 +1704,7 @@ async fn process_me_writer_response_concurrent_same_user_quota_does_not_overshoo
|
||||||
&stats,
|
&stats,
|
||||||
user,
|
user,
|
||||||
Some(1),
|
Some(1),
|
||||||
|
0,
|
||||||
&bytes_me2c,
|
&bytes_me2c,
|
||||||
91,
|
91,
|
||||||
false,
|
false,
|
||||||
|
|
@ -1717,6 +1722,7 @@ async fn process_me_writer_response_concurrent_same_user_quota_does_not_overshoo
|
||||||
&stats,
|
&stats,
|
||||||
user,
|
user,
|
||||||
Some(1),
|
Some(1),
|
||||||
|
0,
|
||||||
&bytes_me2c,
|
&bytes_me2c,
|
||||||
92,
|
92,
|
||||||
false,
|
false,
|
||||||
|
|
@ -1765,6 +1771,7 @@ async fn process_me_writer_response_data_does_not_forward_partial_payload_when_r
|
||||||
&stats,
|
&stats,
|
||||||
"partial-quota-user",
|
"partial-quota-user",
|
||||||
Some(4),
|
Some(4),
|
||||||
|
0,
|
||||||
&bytes_me2c,
|
&bytes_me2c,
|
||||||
90,
|
90,
|
||||||
false,
|
false,
|
||||||
|
|
@ -1970,6 +1977,7 @@ async fn run_quota_race_attempt(
|
||||||
stats,
|
stats,
|
||||||
user,
|
user,
|
||||||
Some(1),
|
Some(1),
|
||||||
|
0,
|
||||||
bytes_me2c,
|
bytes_me2c,
|
||||||
conn_id,
|
conn_id,
|
||||||
false,
|
false,
|
||||||
|
|
|
||||||
|
|
@ -126,14 +126,10 @@ pub(crate) async fn reader_loop(
|
||||||
let data = body.slice(12..);
|
let data = body.slice(12..);
|
||||||
trace!(cid, flags, len = data.len(), "RPC_PROXY_ANS");
|
trace!(cid, flags, len = data.len(), "RPC_PROXY_ANS");
|
||||||
|
|
||||||
let data_wait_ms = reader_route_data_wait_ms.load(Ordering::Relaxed);
|
let route_wait_ms = reader_route_data_wait_ms.load(Ordering::Relaxed);
|
||||||
let routed = if data_wait_ms == 0 {
|
let routed = reg
|
||||||
reg.route_nowait(cid, MeResponse::Data { flags, data })
|
.route_with_timeout(cid, MeResponse::Data { flags, data }, route_wait_ms)
|
||||||
.await
|
.await;
|
||||||
} else {
|
|
||||||
reg.route_with_timeout(cid, MeResponse::Data { flags, data }, data_wait_ms)
|
|
||||||
.await
|
|
||||||
};
|
|
||||||
if !matches!(routed, RouteResult::Routed) {
|
if !matches!(routed, RouteResult::Routed) {
|
||||||
match routed {
|
match routed {
|
||||||
RouteResult::NoConn => stats.increment_me_route_drop_no_conn(),
|
RouteResult::NoConn => stats.increment_me_route_drop_no_conn(),
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue