Merge branch 'flow' into flow

This commit is contained in:
Alexey 2026-03-22 15:50:21 +03:00 committed by GitHub
commit 3011a9ef6d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 1369 additions and 65 deletions

View File

@ -174,6 +174,24 @@ pub(super) struct ZeroMiddleProxyData {
pub(super) route_drop_queue_full_total: u64,
pub(super) route_drop_queue_full_base_total: u64,
pub(super) route_drop_queue_full_high_total: u64,
pub(super) d2c_batches_total: u64,
pub(super) d2c_batch_frames_total: u64,
pub(super) d2c_batch_bytes_total: u64,
pub(super) d2c_flush_reason_queue_drain_total: u64,
pub(super) d2c_flush_reason_batch_frames_total: u64,
pub(super) d2c_flush_reason_batch_bytes_total: u64,
pub(super) d2c_flush_reason_max_delay_total: u64,
pub(super) d2c_flush_reason_ack_immediate_total: u64,
pub(super) d2c_flush_reason_close_total: u64,
pub(super) d2c_data_frames_total: u64,
pub(super) d2c_ack_frames_total: u64,
pub(super) d2c_payload_bytes_total: u64,
pub(super) d2c_write_mode_coalesced_total: u64,
pub(super) d2c_write_mode_split_total: u64,
pub(super) d2c_quota_reject_pre_write_total: u64,
pub(super) d2c_quota_reject_post_write_total: u64,
pub(super) d2c_frame_buf_shrink_total: u64,
pub(super) d2c_frame_buf_shrink_bytes_total: u64,
pub(super) socks_kdf_strict_reject_total: u64,
pub(super) socks_kdf_compat_fallback_total: u64,
pub(super) endpoint_quarantine_total: u64,

View File

@ -68,6 +68,25 @@ pub(super) fn build_zero_all_data(stats: &Stats, configured_users: usize) -> Zer
route_drop_queue_full_total: stats.get_me_route_drop_queue_full(),
route_drop_queue_full_base_total: stats.get_me_route_drop_queue_full_base(),
route_drop_queue_full_high_total: stats.get_me_route_drop_queue_full_high(),
d2c_batches_total: stats.get_me_d2c_batches_total(),
d2c_batch_frames_total: stats.get_me_d2c_batch_frames_total(),
d2c_batch_bytes_total: stats.get_me_d2c_batch_bytes_total(),
d2c_flush_reason_queue_drain_total: stats.get_me_d2c_flush_reason_queue_drain_total(),
d2c_flush_reason_batch_frames_total: stats.get_me_d2c_flush_reason_batch_frames_total(),
d2c_flush_reason_batch_bytes_total: stats.get_me_d2c_flush_reason_batch_bytes_total(),
d2c_flush_reason_max_delay_total: stats.get_me_d2c_flush_reason_max_delay_total(),
d2c_flush_reason_ack_immediate_total: stats
.get_me_d2c_flush_reason_ack_immediate_total(),
d2c_flush_reason_close_total: stats.get_me_d2c_flush_reason_close_total(),
d2c_data_frames_total: stats.get_me_d2c_data_frames_total(),
d2c_ack_frames_total: stats.get_me_d2c_ack_frames_total(),
d2c_payload_bytes_total: stats.get_me_d2c_payload_bytes_total(),
d2c_write_mode_coalesced_total: stats.get_me_d2c_write_mode_coalesced_total(),
d2c_write_mode_split_total: stats.get_me_d2c_write_mode_split_total(),
d2c_quota_reject_pre_write_total: stats.get_me_d2c_quota_reject_pre_write_total(),
d2c_quota_reject_post_write_total: stats.get_me_d2c_quota_reject_post_write_total(),
d2c_frame_buf_shrink_total: stats.get_me_d2c_frame_buf_shrink_total(),
d2c_frame_buf_shrink_bytes_total: stats.get_me_d2c_frame_buf_shrink_bytes_total(),
socks_kdf_strict_reject_total: stats.get_me_socks_kdf_strict_reject(),
socks_kdf_compat_fallback_total: stats.get_me_socks_kdf_compat_fallback(),
endpoint_quarantine_total: stats.get_me_endpoint_quarantine_total(),

View File

@ -29,6 +29,8 @@ const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_FRAMES: usize = 32;
const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_BYTES: usize = 128 * 1024;
const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_DELAY_US: u64 = 500;
const DEFAULT_ME_D2C_ACK_FLUSH_IMMEDIATE: bool = true;
const DEFAULT_ME_QUOTA_SOFT_OVERSHOOT_BYTES: u64 = 64 * 1024;
const DEFAULT_ME_D2C_FRAME_BUF_SHRINK_THRESHOLD_BYTES: usize = 256 * 1024;
const DEFAULT_DIRECT_RELAY_COPY_BUF_C2S_BYTES: usize = 64 * 1024;
const DEFAULT_DIRECT_RELAY_COPY_BUF_S2C_BYTES: usize = 256 * 1024;
const DEFAULT_ME_WRITER_PICK_SAMPLE_SIZE: u8 = 3;
@ -387,6 +389,14 @@ pub(crate) fn default_me_d2c_ack_flush_immediate() -> bool {
DEFAULT_ME_D2C_ACK_FLUSH_IMMEDIATE
}
pub(crate) fn default_me_quota_soft_overshoot_bytes() -> u64 {
DEFAULT_ME_QUOTA_SOFT_OVERSHOOT_BYTES
}
pub(crate) fn default_me_d2c_frame_buf_shrink_threshold_bytes() -> usize {
DEFAULT_ME_D2C_FRAME_BUF_SHRINK_THRESHOLD_BYTES
}
pub(crate) fn default_direct_relay_copy_buf_c2s_bytes() -> usize {
DEFAULT_DIRECT_RELAY_COPY_BUF_C2S_BYTES
}

View File

@ -106,6 +106,8 @@ pub struct HotFields {
pub me_d2c_flush_batch_max_bytes: usize,
pub me_d2c_flush_batch_max_delay_us: u64,
pub me_d2c_ack_flush_immediate: bool,
pub me_quota_soft_overshoot_bytes: u64,
pub me_d2c_frame_buf_shrink_threshold_bytes: usize,
pub direct_relay_copy_buf_c2s_bytes: usize,
pub direct_relay_copy_buf_s2c_bytes: usize,
pub me_health_interval_ms_unhealthy: u64,
@ -225,6 +227,8 @@ impl HotFields {
me_d2c_flush_batch_max_bytes: cfg.general.me_d2c_flush_batch_max_bytes,
me_d2c_flush_batch_max_delay_us: cfg.general.me_d2c_flush_batch_max_delay_us,
me_d2c_ack_flush_immediate: cfg.general.me_d2c_ack_flush_immediate,
me_quota_soft_overshoot_bytes: cfg.general.me_quota_soft_overshoot_bytes,
me_d2c_frame_buf_shrink_threshold_bytes: cfg.general.me_d2c_frame_buf_shrink_threshold_bytes,
direct_relay_copy_buf_c2s_bytes: cfg.general.direct_relay_copy_buf_c2s_bytes,
direct_relay_copy_buf_s2c_bytes: cfg.general.direct_relay_copy_buf_s2c_bytes,
me_health_interval_ms_unhealthy: cfg.general.me_health_interval_ms_unhealthy,
@ -511,6 +515,9 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig {
cfg.general.me_d2c_flush_batch_max_bytes = new.general.me_d2c_flush_batch_max_bytes;
cfg.general.me_d2c_flush_batch_max_delay_us = new.general.me_d2c_flush_batch_max_delay_us;
cfg.general.me_d2c_ack_flush_immediate = new.general.me_d2c_ack_flush_immediate;
cfg.general.me_quota_soft_overshoot_bytes = new.general.me_quota_soft_overshoot_bytes;
cfg.general.me_d2c_frame_buf_shrink_threshold_bytes =
new.general.me_d2c_frame_buf_shrink_threshold_bytes;
cfg.general.direct_relay_copy_buf_c2s_bytes = new.general.direct_relay_copy_buf_c2s_bytes;
cfg.general.direct_relay_copy_buf_s2c_bytes = new.general.direct_relay_copy_buf_s2c_bytes;
cfg.general.me_health_interval_ms_unhealthy = new.general.me_health_interval_ms_unhealthy;
@ -1030,15 +1037,20 @@ fn log_changes(
|| old_hot.me_d2c_flush_batch_max_bytes != new_hot.me_d2c_flush_batch_max_bytes
|| old_hot.me_d2c_flush_batch_max_delay_us != new_hot.me_d2c_flush_batch_max_delay_us
|| old_hot.me_d2c_ack_flush_immediate != new_hot.me_d2c_ack_flush_immediate
|| old_hot.me_quota_soft_overshoot_bytes != new_hot.me_quota_soft_overshoot_bytes
|| old_hot.me_d2c_frame_buf_shrink_threshold_bytes
!= new_hot.me_d2c_frame_buf_shrink_threshold_bytes
|| old_hot.direct_relay_copy_buf_c2s_bytes != new_hot.direct_relay_copy_buf_c2s_bytes
|| old_hot.direct_relay_copy_buf_s2c_bytes != new_hot.direct_relay_copy_buf_s2c_bytes
{
info!(
"config reload: relay_tuning: me_d2c_frames={} me_d2c_bytes={} me_d2c_delay_us={} me_ack_flush_immediate={} direct_buf_c2s={} direct_buf_s2c={}",
"config reload: relay_tuning: me_d2c_frames={} me_d2c_bytes={} me_d2c_delay_us={} me_ack_flush_immediate={} me_quota_soft_overshoot_bytes={} me_d2c_frame_buf_shrink_threshold_bytes={} direct_buf_c2s={} direct_buf_s2c={}",
new_hot.me_d2c_flush_batch_max_frames,
new_hot.me_d2c_flush_batch_max_bytes,
new_hot.me_d2c_flush_batch_max_delay_us,
new_hot.me_d2c_ack_flush_immediate,
new_hot.me_quota_soft_overshoot_bytes,
new_hot.me_d2c_frame_buf_shrink_threshold_bytes,
new_hot.direct_relay_copy_buf_c2s_bytes,
new_hot.direct_relay_copy_buf_s2c_bytes,
);

View File

@ -533,6 +533,19 @@ impl ProxyConfig {
));
}
if config.general.me_quota_soft_overshoot_bytes > 16 * 1024 * 1024 {
return Err(ProxyError::Config(
"general.me_quota_soft_overshoot_bytes must be within [0, 16777216]".to_string(),
));
}
if !(4096..=16 * 1024 * 1024).contains(&config.general.me_d2c_frame_buf_shrink_threshold_bytes) {
return Err(ProxyError::Config(
"general.me_d2c_frame_buf_shrink_threshold_bytes must be within [4096, 16777216]"
.to_string(),
));
}
if !(4096..=1024 * 1024).contains(&config.general.direct_relay_copy_buf_c2s_bytes) {
return Err(ProxyError::Config(
"general.direct_relay_copy_buf_c2s_bytes must be within [4096, 1048576]"

View File

@ -468,7 +468,7 @@ pub struct GeneralConfig {
pub me_c2me_send_timeout_ms: u64,
/// Bounded wait in milliseconds for routing ME DATA to per-connection queue.
/// `0` keeps legacy no-wait behavior.
/// `0` keeps non-blocking routing; values >0 enable bounded wait for compatibility.
#[serde(default = "default_me_reader_route_data_wait_ms")]
pub me_reader_route_data_wait_ms: u64,
@ -489,6 +489,14 @@ pub struct GeneralConfig {
#[serde(default = "default_me_d2c_ack_flush_immediate")]
pub me_d2c_ack_flush_immediate: bool,
/// Additional bytes above strict per-user quota allowed in hot-path soft mode.
#[serde(default = "default_me_quota_soft_overshoot_bytes")]
pub me_quota_soft_overshoot_bytes: u64,
/// Shrink threshold for reusable ME->Client frame assembly buffer.
#[serde(default = "default_me_d2c_frame_buf_shrink_threshold_bytes")]
pub me_d2c_frame_buf_shrink_threshold_bytes: usize,
/// Copy buffer size for client->DC direction in direct relay.
#[serde(default = "default_direct_relay_copy_buf_c2s_bytes")]
pub direct_relay_copy_buf_c2s_bytes: usize,
@ -945,6 +953,8 @@ impl Default for GeneralConfig {
me_d2c_flush_batch_max_bytes: default_me_d2c_flush_batch_max_bytes(),
me_d2c_flush_batch_max_delay_us: default_me_d2c_flush_batch_max_delay_us(),
me_d2c_ack_flush_immediate: default_me_d2c_ack_flush_immediate(),
me_quota_soft_overshoot_bytes: default_me_quota_soft_overshoot_bytes(),
me_d2c_frame_buf_shrink_threshold_bytes: default_me_d2c_frame_buf_shrink_threshold_bytes(),
direct_relay_copy_buf_c2s_bytes: default_direct_relay_copy_buf_c2s_bytes(),
direct_relay_copy_buf_s2c_bytes: default_direct_relay_copy_buf_s2c_bytes(),
me_warmup_stagger_enabled: default_true(),

View File

@ -935,6 +935,462 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_batches_total Total DC->Client flush batches"
);
let _ = writeln!(out, "# TYPE telemt_me_d2c_batches_total counter");
let _ = writeln!(
out,
"telemt_me_d2c_batches_total {}",
if me_allows_normal {
stats.get_me_d2c_batches_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_batch_frames_total Total DC->Client frames flushed in batches"
);
let _ = writeln!(out, "# TYPE telemt_me_d2c_batch_frames_total counter");
let _ = writeln!(
out,
"telemt_me_d2c_batch_frames_total {}",
if me_allows_normal {
stats.get_me_d2c_batch_frames_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_batch_bytes_total Total DC->Client bytes flushed in batches"
);
let _ = writeln!(out, "# TYPE telemt_me_d2c_batch_bytes_total counter");
let _ = writeln!(
out,
"telemt_me_d2c_batch_bytes_total {}",
if me_allows_normal {
stats.get_me_d2c_batch_bytes_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_flush_reason_total DC->Client flush reasons"
);
let _ = writeln!(out, "# TYPE telemt_me_d2c_flush_reason_total counter");
let _ = writeln!(
out,
"telemt_me_d2c_flush_reason_total{{reason=\"queue_drain\"}} {}",
if me_allows_normal {
stats.get_me_d2c_flush_reason_queue_drain_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_flush_reason_total{{reason=\"batch_frames\"}} {}",
if me_allows_normal {
stats.get_me_d2c_flush_reason_batch_frames_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_flush_reason_total{{reason=\"batch_bytes\"}} {}",
if me_allows_normal {
stats.get_me_d2c_flush_reason_batch_bytes_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_flush_reason_total{{reason=\"max_delay\"}} {}",
if me_allows_normal {
stats.get_me_d2c_flush_reason_max_delay_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_flush_reason_total{{reason=\"ack_immediate\"}} {}",
if me_allows_normal {
stats.get_me_d2c_flush_reason_ack_immediate_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_flush_reason_total{{reason=\"close\"}} {}",
if me_allows_normal {
stats.get_me_d2c_flush_reason_close_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_data_frames_total DC->Client data frames"
);
let _ = writeln!(out, "# TYPE telemt_me_d2c_data_frames_total counter");
let _ = writeln!(
out,
"telemt_me_d2c_data_frames_total {}",
if me_allows_normal {
stats.get_me_d2c_data_frames_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_ack_frames_total DC->Client quick-ack frames"
);
let _ = writeln!(out, "# TYPE telemt_me_d2c_ack_frames_total counter");
let _ = writeln!(
out,
"telemt_me_d2c_ack_frames_total {}",
if me_allows_normal {
stats.get_me_d2c_ack_frames_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_payload_bytes_total DC->Client payload bytes before transport framing"
);
let _ = writeln!(out, "# TYPE telemt_me_d2c_payload_bytes_total counter");
let _ = writeln!(
out,
"telemt_me_d2c_payload_bytes_total {}",
if me_allows_normal {
stats.get_me_d2c_payload_bytes_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_write_mode_total DC->Client writer mode selection"
);
let _ = writeln!(out, "# TYPE telemt_me_d2c_write_mode_total counter");
let _ = writeln!(
out,
"telemt_me_d2c_write_mode_total{{mode=\"coalesced\"}} {}",
if me_allows_normal {
stats.get_me_d2c_write_mode_coalesced_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_write_mode_total{{mode=\"split\"}} {}",
if me_allows_normal {
stats.get_me_d2c_write_mode_split_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_quota_reject_total DC->Client quota rejects"
);
let _ = writeln!(out, "# TYPE telemt_me_d2c_quota_reject_total counter");
let _ = writeln!(
out,
"telemt_me_d2c_quota_reject_total{{stage=\"pre_write\"}} {}",
if me_allows_normal {
stats.get_me_d2c_quota_reject_pre_write_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_quota_reject_total{{stage=\"post_write\"}} {}",
if me_allows_normal {
stats.get_me_d2c_quota_reject_post_write_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_frame_buf_shrink_total DC->Client reusable frame buffer shrink events"
);
let _ = writeln!(out, "# TYPE telemt_me_d2c_frame_buf_shrink_total counter");
let _ = writeln!(
out,
"telemt_me_d2c_frame_buf_shrink_total {}",
if me_allows_normal {
stats.get_me_d2c_frame_buf_shrink_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_frame_buf_shrink_bytes_total DC->Client reusable frame buffer bytes released"
);
let _ = writeln!(
out,
"# TYPE telemt_me_d2c_frame_buf_shrink_bytes_total counter"
);
let _ = writeln!(
out,
"telemt_me_d2c_frame_buf_shrink_bytes_total {}",
if me_allows_normal {
stats.get_me_d2c_frame_buf_shrink_bytes_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_batch_frames_bucket_total DC->Client batch frame count buckets"
);
let _ = writeln!(
out,
"# TYPE telemt_me_d2c_batch_frames_bucket_total counter"
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_frames_bucket_total{{bucket=\"1\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_frames_bucket_1()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_frames_bucket_total{{bucket=\"2_4\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_frames_bucket_2_4()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_frames_bucket_total{{bucket=\"5_8\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_frames_bucket_5_8()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_frames_bucket_total{{bucket=\"9_16\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_frames_bucket_9_16()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_frames_bucket_total{{bucket=\"17_32\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_frames_bucket_17_32()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_frames_bucket_total{{bucket=\"gt_32\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_frames_bucket_gt_32()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_batch_bytes_bucket_total DC->Client batch byte size buckets"
);
let _ = writeln!(
out,
"# TYPE telemt_me_d2c_batch_bytes_bucket_total counter"
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_bytes_bucket_total{{bucket=\"0_1k\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_bytes_bucket_0_1k()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_bytes_bucket_total{{bucket=\"1k_4k\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_bytes_bucket_1k_4k()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_bytes_bucket_total{{bucket=\"4k_16k\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_bytes_bucket_4k_16k()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_bytes_bucket_total{{bucket=\"16k_64k\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_bytes_bucket_16k_64k()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_bytes_bucket_total{{bucket=\"64k_128k\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_bytes_bucket_64k_128k()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_bytes_bucket_total{{bucket=\"gt_128k\"}} {}",
if me_allows_debug {
stats.get_me_d2c_batch_bytes_bucket_gt_128k()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_flush_duration_us_bucket_total DC->Client flush duration buckets"
);
let _ = writeln!(
out,
"# TYPE telemt_me_d2c_flush_duration_us_bucket_total counter"
);
let _ = writeln!(
out,
"telemt_me_d2c_flush_duration_us_bucket_total{{bucket=\"0_50\"}} {}",
if me_allows_debug {
stats.get_me_d2c_flush_duration_us_bucket_0_50()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_flush_duration_us_bucket_total{{bucket=\"51_200\"}} {}",
if me_allows_debug {
stats.get_me_d2c_flush_duration_us_bucket_51_200()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_flush_duration_us_bucket_total{{bucket=\"201_1000\"}} {}",
if me_allows_debug {
stats.get_me_d2c_flush_duration_us_bucket_201_1000()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_flush_duration_us_bucket_total{{bucket=\"1001_5000\"}} {}",
if me_allows_debug {
stats.get_me_d2c_flush_duration_us_bucket_1001_5000()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_flush_duration_us_bucket_total{{bucket=\"5001_20000\"}} {}",
if me_allows_debug {
stats.get_me_d2c_flush_duration_us_bucket_5001_20000()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_d2c_flush_duration_us_bucket_total{{bucket=\"gt_20000\"}} {}",
if me_allows_debug {
stats.get_me_d2c_flush_duration_us_bucket_gt_20000()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_batch_timeout_armed_total DC->Client max-delay timer armed events"
);
let _ = writeln!(
out,
"# TYPE telemt_me_d2c_batch_timeout_armed_total counter"
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_timeout_armed_total {}",
if me_allows_debug {
stats.get_me_d2c_batch_timeout_armed_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_batch_timeout_fired_total DC->Client max-delay timer fired events"
);
let _ = writeln!(
out,
"# TYPE telemt_me_d2c_batch_timeout_fired_total counter"
);
let _ = writeln!(
out,
"telemt_me_d2c_batch_timeout_fired_total {}",
if me_allows_debug {
stats.get_me_d2c_batch_timeout_fired_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_writer_pick_total ME writer-pick outcomes by mode and result"
@ -2145,6 +2601,16 @@ mod tests {
stats.increment_relay_idle_hard_close_total();
stats.increment_relay_pressure_evict_total();
stats.increment_relay_protocol_desync_close_total();
stats.increment_me_d2c_batches_total();
stats.add_me_d2c_batch_frames_total(3);
stats.add_me_d2c_batch_bytes_total(2048);
stats.increment_me_d2c_flush_reason(crate::stats::MeD2cFlushReason::AckImmediate);
stats.increment_me_d2c_data_frames_total();
stats.increment_me_d2c_ack_frames_total();
stats.add_me_d2c_payload_bytes_total(1800);
stats.increment_me_d2c_write_mode(crate::stats::MeD2cWriteMode::Coalesced);
stats.increment_me_d2c_quota_reject_total(crate::stats::MeD2cQuotaRejectStage::PostWrite);
stats.observe_me_d2c_frame_buf_shrink(4096);
stats.increment_user_connects("alice");
stats.increment_user_curr_connects("alice");
stats.add_user_octets_from("alice", 1024);
@ -2184,6 +2650,17 @@ mod tests {
assert!(output.contains("telemt_relay_idle_hard_close_total 1"));
assert!(output.contains("telemt_relay_pressure_evict_total 1"));
assert!(output.contains("telemt_relay_protocol_desync_close_total 1"));
assert!(output.contains("telemt_me_d2c_batches_total 1"));
assert!(output.contains("telemt_me_d2c_batch_frames_total 3"));
assert!(output.contains("telemt_me_d2c_batch_bytes_total 2048"));
assert!(output.contains("telemt_me_d2c_flush_reason_total{reason=\"ack_immediate\"} 1"));
assert!(output.contains("telemt_me_d2c_data_frames_total 1"));
assert!(output.contains("telemt_me_d2c_ack_frames_total 1"));
assert!(output.contains("telemt_me_d2c_payload_bytes_total 1800"));
assert!(output.contains("telemt_me_d2c_write_mode_total{mode=\"coalesced\"} 1"));
assert!(output.contains("telemt_me_d2c_quota_reject_total{stage=\"post_write\"} 1"));
assert!(output.contains("telemt_me_d2c_frame_buf_shrink_total 1"));
assert!(output.contains("telemt_me_d2c_frame_buf_shrink_bytes_total 4096"));
assert!(output.contains("telemt_user_connections_total{user=\"alice\"} 1"));
assert!(output.contains("telemt_user_connections_current{user=\"alice\"} 1"));
assert!(output.contains("telemt_user_octets_from_client{user=\"alice\"} 1024"));
@ -2245,6 +2722,11 @@ mod tests {
assert!(output.contains("# TYPE telemt_relay_idle_hard_close_total counter"));
assert!(output.contains("# TYPE telemt_relay_pressure_evict_total counter"));
assert!(output.contains("# TYPE telemt_relay_protocol_desync_close_total counter"));
assert!(output.contains("# TYPE telemt_me_d2c_batches_total counter"));
assert!(output.contains("# TYPE telemt_me_d2c_flush_reason_total counter"));
assert!(output.contains("# TYPE telemt_me_d2c_write_mode_total counter"));
assert!(output.contains("# TYPE telemt_me_d2c_batch_frames_bucket_total counter"));
assert!(output.contains("# TYPE telemt_me_d2c_flush_duration_us_bucket_total counter"));
assert!(output.contains("# TYPE telemt_me_writer_removed_total counter"));
assert!(
output

View File

@ -21,7 +21,7 @@ use crate::proxy::route_mode::{
ROUTE_SWITCH_ERROR_MSG, RelayRouteMode, RouteCutoverState, affected_cutover_state,
cutover_stagger_delay,
};
use crate::stats::Stats;
use crate::stats::{MeD2cFlushReason, MeD2cQuotaRejectStage, MeD2cWriteMode, Stats};
use crate::stream::{BufferPool, CryptoReader, CryptoWriter, PooledBuffer};
use crate::transport::middle_proxy::{MePool, MeResponse, proto_flags_for_tag};
@ -45,6 +45,8 @@ const C2ME_SEND_TIMEOUT: Duration = Duration::from_millis(50);
const C2ME_SEND_TIMEOUT: Duration = Duration::from_secs(5);
const ME_D2C_FLUSH_BATCH_MAX_FRAMES_MIN: usize = 1;
const ME_D2C_FLUSH_BATCH_MAX_BYTES_MIN: usize = 4096;
const ME_D2C_FRAME_BUF_SHRINK_HYSTERESIS_FACTOR: usize = 2;
const ME_D2C_SINGLE_WRITE_COALESCE_MAX_BYTES: usize = 128 * 1024;
#[cfg(test)]
const QUOTA_USER_LOCKS_MAX: usize = 64;
#[cfg(not(test))]
@ -214,6 +216,8 @@ struct MeD2cFlushPolicy {
max_bytes: usize,
max_delay: Duration,
ack_flush_immediate: bool,
quota_soft_overshoot_bytes: u64,
frame_buf_shrink_threshold_bytes: usize,
}
#[derive(Clone, Copy)]
@ -284,6 +288,11 @@ impl MeD2cFlushPolicy {
.max(ME_D2C_FLUSH_BATCH_MAX_BYTES_MIN),
max_delay: Duration::from_micros(config.general.me_d2c_flush_batch_max_delay_us),
ack_flush_immediate: config.general.me_d2c_ack_flush_immediate,
quota_soft_overshoot_bytes: config.general.me_quota_soft_overshoot_bytes,
frame_buf_shrink_threshold_bytes: config
.general
.me_d2c_frame_buf_shrink_threshold_bytes
.max(4096),
}
}
}
@ -538,6 +547,76 @@ fn quota_would_be_exceeded_for_user(
})
}
fn quota_soft_cap(limit: u64, overshoot: u64) -> u64 {
limit.saturating_add(overshoot)
}
fn quota_exceeded_for_user_soft(
stats: &Stats,
user: &str,
quota_limit: Option<u64>,
overshoot: u64,
) -> bool {
quota_limit.is_some_and(|quota| stats.get_user_total_octets(user) >= quota_soft_cap(quota, overshoot))
}
fn quota_would_be_exceeded_for_user_soft(
stats: &Stats,
user: &str,
quota_limit: Option<u64>,
bytes: u64,
overshoot: u64,
) -> bool {
quota_limit.is_some_and(|quota| {
let cap = quota_soft_cap(quota, overshoot);
let used = stats.get_user_total_octets(user);
used >= cap || bytes > cap.saturating_sub(used)
})
}
fn classify_me_d2c_flush_reason(
flush_immediately: bool,
batch_frames: usize,
max_frames: usize,
batch_bytes: usize,
max_bytes: usize,
max_delay_fired: bool,
) -> MeD2cFlushReason {
if flush_immediately {
return MeD2cFlushReason::AckImmediate;
}
if batch_frames >= max_frames {
return MeD2cFlushReason::BatchFrames;
}
if batch_bytes >= max_bytes {
return MeD2cFlushReason::BatchBytes;
}
if max_delay_fired {
return MeD2cFlushReason::MaxDelay;
}
MeD2cFlushReason::QueueDrain
}
fn observe_me_d2c_flush_event(
stats: &Stats,
reason: MeD2cFlushReason,
batch_frames: usize,
batch_bytes: usize,
flush_duration_us: Option<u64>,
) {
stats.increment_me_d2c_flush_reason(reason);
if batch_frames > 0 || batch_bytes > 0 {
stats.increment_me_d2c_batches_total();
stats.add_me_d2c_batch_frames_total(batch_frames as u64);
stats.add_me_d2c_batch_bytes_total(batch_bytes as u64);
stats.observe_me_d2c_batch_frames(batch_frames as u64);
stats.observe_me_d2c_batch_bytes(batch_bytes as u64);
}
if let Some(duration_us) = flush_duration_us {
stats.observe_me_d2c_flush_duration_us(duration_us);
}
}
#[cfg(test)]
fn quota_user_lock_test_guard() -> &'static Mutex<()> {
static TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
@ -774,6 +853,7 @@ where
let mut batch_frames = 0usize;
let mut batch_bytes = 0usize;
let mut flush_immediately;
let mut max_delay_fired = false;
let first_is_downstream_activity =
matches!(&first, MeResponse::Data { .. } | MeResponse::Ack(_));
@ -786,6 +866,7 @@ where
stats_clone.as_ref(),
&user_clone,
quota_limit,
d2c_flush_policy.quota_soft_overshoot_bytes,
bytes_me2c_clone.as_ref(),
conn_id,
d2c_flush_policy.ack_flush_immediate,
@ -801,7 +882,25 @@ where
flush_immediately = immediate;
}
MeWriterResponseOutcome::Close => {
let flush_started_at = if stats_clone.telemetry_policy().me_level.allows_debug() {
Some(Instant::now())
} else {
None
};
let _ = writer.flush().await;
let flush_duration_us = flush_started_at.map(|started| {
started
.elapsed()
.as_micros()
.min(u128::from(u64::MAX)) as u64
});
observe_me_d2c_flush_event(
stats_clone.as_ref(),
MeD2cFlushReason::Close,
batch_frames,
batch_bytes,
flush_duration_us,
);
return Ok(());
}
}
@ -825,6 +924,7 @@ where
stats_clone.as_ref(),
&user_clone,
quota_limit,
d2c_flush_policy.quota_soft_overshoot_bytes,
bytes_me2c_clone.as_ref(),
conn_id,
d2c_flush_policy.ack_flush_immediate,
@ -840,7 +940,27 @@ where
flush_immediately |= immediate;
}
MeWriterResponseOutcome::Close => {
let flush_started_at =
if stats_clone.telemetry_policy().me_level.allows_debug() {
Some(Instant::now())
} else {
None
};
let _ = writer.flush().await;
let flush_duration_us = flush_started_at.map(|started| {
started
.elapsed()
.as_micros()
.min(u128::from(u64::MAX))
as u64
});
observe_me_d2c_flush_event(
stats_clone.as_ref(),
MeD2cFlushReason::Close,
batch_frames,
batch_bytes,
flush_duration_us,
);
return Ok(());
}
}
@ -851,6 +971,7 @@ where
&& batch_frames < d2c_flush_policy.max_frames
&& batch_bytes < d2c_flush_policy.max_bytes
{
stats_clone.increment_me_d2c_batch_timeout_armed_total();
match tokio::time::timeout(d2c_flush_policy.max_delay, me_rx_task.recv()).await {
Ok(Some(next)) => {
let next_is_downstream_activity =
@ -864,6 +985,7 @@ where
stats_clone.as_ref(),
&user_clone,
quota_limit,
d2c_flush_policy.quota_soft_overshoot_bytes,
bytes_me2c_clone.as_ref(),
conn_id,
d2c_flush_policy.ack_flush_immediate,
@ -879,7 +1001,30 @@ where
flush_immediately |= immediate;
}
MeWriterResponseOutcome::Close => {
let flush_started_at = if stats_clone
.telemetry_policy()
.me_level
.allows_debug()
{
Some(Instant::now())
} else {
None
};
let _ = writer.flush().await;
let flush_duration_us = flush_started_at.map(|started| {
started
.elapsed()
.as_micros()
.min(u128::from(u64::MAX))
as u64
});
observe_me_d2c_flush_event(
stats_clone.as_ref(),
MeD2cFlushReason::Close,
batch_frames,
batch_bytes,
flush_duration_us,
);
return Ok(());
}
}
@ -903,6 +1048,7 @@ where
stats_clone.as_ref(),
&user_clone,
quota_limit,
d2c_flush_policy.quota_soft_overshoot_bytes,
bytes_me2c_clone.as_ref(),
conn_id,
d2c_flush_policy.ack_flush_immediate,
@ -918,7 +1064,30 @@ where
flush_immediately |= immediate;
}
MeWriterResponseOutcome::Close => {
let flush_started_at = if stats_clone
.telemetry_policy()
.me_level
.allows_debug()
{
Some(Instant::now())
} else {
None
};
let _ = writer.flush().await;
let flush_duration_us = flush_started_at.map(|started| {
started
.elapsed()
.as_micros()
.min(u128::from(u64::MAX))
as u64
});
observe_me_d2c_flush_event(
stats_clone.as_ref(),
MeD2cFlushReason::Close,
batch_frames,
batch_bytes,
flush_duration_us,
);
return Ok(());
}
}
@ -928,11 +1097,50 @@ where
debug!(conn_id, "ME channel closed");
return Err(ProxyError::Proxy("ME connection lost".into()));
}
Err(_) => {}
Err(_) => {
max_delay_fired = true;
stats_clone.increment_me_d2c_batch_timeout_fired_total();
}
}
}
let flush_reason = classify_me_d2c_flush_reason(
flush_immediately,
batch_frames,
d2c_flush_policy.max_frames,
batch_bytes,
d2c_flush_policy.max_bytes,
max_delay_fired,
);
let flush_started_at = if stats_clone.telemetry_policy().me_level.allows_debug() {
Some(Instant::now())
} else {
None
};
writer.flush().await.map_err(ProxyError::Io)?;
let flush_duration_us = flush_started_at.map(|started| {
started
.elapsed()
.as_micros()
.min(u128::from(u64::MAX)) as u64
});
observe_me_d2c_flush_event(
stats_clone.as_ref(),
flush_reason,
batch_frames,
batch_bytes,
flush_duration_us,
);
let shrink_threshold = d2c_flush_policy.frame_buf_shrink_threshold_bytes;
let shrink_trigger = shrink_threshold
.saturating_mul(ME_D2C_FRAME_BUF_SHRINK_HYSTERESIS_FACTOR);
if frame_buf.capacity() > shrink_trigger {
let cap_before = frame_buf.capacity();
frame_buf.shrink_to(shrink_threshold);
let cap_after = frame_buf.capacity();
let bytes_freed = cap_before.saturating_sub(cap_after) as u64;
stats_clone.observe_me_d2c_frame_buf_shrink(bytes_freed);
}
}
_ = &mut stop_rx => {
debug!(conn_id, "ME writer stop signal");
@ -1482,6 +1690,7 @@ async fn process_me_writer_response<W>(
stats: &Stats,
user: &str,
quota_limit: Option<u64>,
quota_soft_overshoot_bytes: u64,
bytes_me2c: &AtomicU64,
conn_id: u64,
ack_flush_immediate: bool,
@ -1498,31 +1707,39 @@ where
trace!(conn_id, bytes = data.len(), flags, "ME->C data");
}
let data_len = data.len() as u64;
if let Some(limit) = quota_limit {
let quota_lock = quota_user_lock(user);
let _quota_guard = quota_lock.lock().await;
if quota_would_be_exceeded_for_user(stats, user, Some(limit), data_len) {
return Err(ProxyError::DataQuotaExceeded {
user: user.to_string(),
});
}
if quota_would_be_exceeded_for_user_soft(
stats,
user,
quota_limit,
data_len,
quota_soft_overshoot_bytes,
) {
stats.increment_me_d2c_quota_reject_total(MeD2cQuotaRejectStage::PreWrite);
return Err(ProxyError::DataQuotaExceeded {
user: user.to_string(),
});
}
let write_mode =
write_client_payload(client_writer, proto_tag, flags, &data, rng, frame_buf)
.await?;
stats.increment_me_d2c_write_mode(write_mode);
bytes_me2c.fetch_add(data.len() as u64, Ordering::Relaxed);
stats.add_user_octets_to(user, data.len() as u64);
bytes_me2c.fetch_add(data.len() as u64, Ordering::Relaxed);
stats.add_user_octets_to(user, data.len() as u64);
stats.increment_me_d2c_data_frames_total();
stats.add_me_d2c_payload_bytes_total(data.len() as u64);
if quota_exceeded_for_user(stats, user, Some(limit)) {
return Err(ProxyError::DataQuotaExceeded {
user: user.to_string(),
});
}
} else {
write_client_payload(client_writer, proto_tag, flags, &data, rng, frame_buf)
.await?;
bytes_me2c.fetch_add(data.len() as u64, Ordering::Relaxed);
stats.add_user_octets_to(user, data.len() as u64);
if quota_exceeded_for_user_soft(
stats,
user,
quota_limit,
quota_soft_overshoot_bytes,
) {
stats.increment_me_d2c_quota_reject_total(MeD2cQuotaRejectStage::PostWrite);
return Err(ProxyError::DataQuotaExceeded {
user: user.to_string(),
});
}
Ok(MeWriterResponseOutcome::Continue {
@ -1538,6 +1755,7 @@ where
trace!(conn_id, confirm, "ME->C quickack");
}
write_client_ack(client_writer, proto_tag, confirm).await?;
stats.increment_me_d2c_ack_frames_total();
Ok(MeWriterResponseOutcome::Continue {
frames: 1,
@ -1588,13 +1806,13 @@ async fn write_client_payload<W>(
data: &[u8],
rng: &SecureRandom,
frame_buf: &mut Vec<u8>,
) -> Result<()>
) -> Result<MeD2cWriteMode>
where
W: AsyncWrite + Unpin + Send + 'static,
{
let quickack = (flags & RPC_FLAG_QUICKACK) != 0;
match proto_tag {
let write_mode = match proto_tag {
ProtoTag::Abridged => {
if !data.len().is_multiple_of(4) {
return Err(ProxyError::Proxy(format!(
@ -1609,28 +1827,46 @@ where
if quickack {
first |= 0x80;
}
frame_buf.clear();
frame_buf.reserve(1 + data.len());
frame_buf.push(first);
frame_buf.extend_from_slice(data);
client_writer
.write_all(frame_buf)
.await
.map_err(ProxyError::Io)?;
let wire_len = 1usize.saturating_add(data.len());
if wire_len <= ME_D2C_SINGLE_WRITE_COALESCE_MAX_BYTES {
frame_buf.clear();
frame_buf.reserve(wire_len);
frame_buf.push(first);
frame_buf.extend_from_slice(data);
client_writer
.write_all(frame_buf.as_slice())
.await
.map_err(ProxyError::Io)?;
MeD2cWriteMode::Coalesced
} else {
let header = [first];
client_writer.write_all(&header).await.map_err(ProxyError::Io)?;
client_writer.write_all(data).await.map_err(ProxyError::Io)?;
MeD2cWriteMode::Split
}
} else if len_words < (1 << 24) {
let mut first = 0x7fu8;
if quickack {
first |= 0x80;
}
let lw = (len_words as u32).to_le_bytes();
frame_buf.clear();
frame_buf.reserve(4 + data.len());
frame_buf.extend_from_slice(&[first, lw[0], lw[1], lw[2]]);
frame_buf.extend_from_slice(data);
client_writer
.write_all(frame_buf)
.await
.map_err(ProxyError::Io)?;
let wire_len = 4usize.saturating_add(data.len());
if wire_len <= ME_D2C_SINGLE_WRITE_COALESCE_MAX_BYTES {
frame_buf.clear();
frame_buf.reserve(wire_len);
frame_buf.extend_from_slice(&[first, lw[0], lw[1], lw[2]]);
frame_buf.extend_from_slice(data);
client_writer
.write_all(frame_buf.as_slice())
.await
.map_err(ProxyError::Io)?;
MeD2cWriteMode::Coalesced
} else {
let header = [first, lw[0], lw[1], lw[2]];
client_writer.write_all(&header).await.map_err(ProxyError::Io)?;
client_writer.write_all(data).await.map_err(ProxyError::Io)?;
MeD2cWriteMode::Split
}
} else {
return Err(ProxyError::Proxy(format!(
"Abridged frame too large: {}",
@ -1650,25 +1886,46 @@ where
} else {
0
};
let (len_val, total) =
compute_intermediate_secure_wire_len(data.len(), padding_len, quickack)?;
frame_buf.clear();
frame_buf.reserve(total);
frame_buf.extend_from_slice(&len_val.to_le_bytes());
frame_buf.extend_from_slice(data);
if padding_len > 0 {
let start = frame_buf.len();
frame_buf.resize(start + padding_len, 0);
rng.fill(&mut frame_buf[start..]);
if total <= ME_D2C_SINGLE_WRITE_COALESCE_MAX_BYTES {
frame_buf.clear();
frame_buf.reserve(total);
frame_buf.extend_from_slice(&len_val.to_le_bytes());
frame_buf.extend_from_slice(data);
if padding_len > 0 {
let start = frame_buf.len();
frame_buf.resize(start + padding_len, 0);
rng.fill(&mut frame_buf[start..]);
}
client_writer
.write_all(frame_buf.as_slice())
.await
.map_err(ProxyError::Io)?;
MeD2cWriteMode::Coalesced
} else {
let header = len_val.to_le_bytes();
client_writer.write_all(&header).await.map_err(ProxyError::Io)?;
client_writer.write_all(data).await.map_err(ProxyError::Io)?;
if padding_len > 0 {
frame_buf.clear();
if frame_buf.capacity() < padding_len {
frame_buf.reserve(padding_len);
}
frame_buf.resize(padding_len, 0);
rng.fill(frame_buf.as_mut_slice());
client_writer
.write_all(frame_buf.as_slice())
.await
.map_err(ProxyError::Io)?;
}
MeD2cWriteMode::Split
}
client_writer
.write_all(frame_buf)
.await
.map_err(ProxyError::Io)?;
}
}
};
Ok(())
Ok(write_mode)
}
async fn write_client_ack<W>(

View File

@ -1540,6 +1540,7 @@ async fn process_me_writer_response_ack_obeys_flush_policy() {
&stats,
"user",
None,
0,
&bytes_me2c,
77,
true,
@ -1566,6 +1567,7 @@ async fn process_me_writer_response_ack_obeys_flush_policy() {
&stats,
"user",
None,
0,
&bytes_me2c,
77,
false,
@ -1606,6 +1608,7 @@ async fn process_me_writer_response_data_updates_byte_accounting() {
&stats,
"user",
None,
0,
&bytes_me2c,
88,
false,
@ -1652,6 +1655,7 @@ async fn process_me_writer_response_data_enforces_live_user_quota() {
&stats,
"quota-user",
Some(12),
0,
&bytes_me2c,
89,
false,
@ -1700,6 +1704,7 @@ async fn process_me_writer_response_concurrent_same_user_quota_does_not_overshoo
&stats,
user,
Some(1),
0,
&bytes_me2c,
91,
false,
@ -1717,6 +1722,7 @@ async fn process_me_writer_response_concurrent_same_user_quota_does_not_overshoo
&stats,
user,
Some(1),
0,
&bytes_me2c,
92,
false,
@ -1765,6 +1771,7 @@ async fn process_me_writer_response_data_does_not_forward_partial_payload_when_r
&stats,
"partial-quota-user",
Some(4),
0,
&bytes_me2c,
90,
false,
@ -1970,6 +1977,7 @@ async fn run_quota_race_attempt(
stats,
user,
Some(1),
0,
bytes_me2c,
conn_id,
false,

View File

@ -26,6 +26,28 @@ enum RouteConnectionGauge {
Middle,
}
#[derive(Debug, Clone, Copy)]
pub enum MeD2cFlushReason {
QueueDrain,
BatchFrames,
BatchBytes,
MaxDelay,
AckImmediate,
Close,
}
#[derive(Debug, Clone, Copy)]
pub enum MeD2cWriteMode {
Coalesced,
Split,
}
#[derive(Debug, Clone, Copy)]
pub enum MeD2cQuotaRejectStage {
PreWrite,
PostWrite,
}
#[must_use = "RouteConnectionLease must be kept alive to hold the connection gauge increment"]
pub struct RouteConnectionLease {
stats: Arc<Stats>,
@ -140,6 +162,44 @@ pub struct Stats {
me_route_drop_queue_full: AtomicU64,
me_route_drop_queue_full_base: AtomicU64,
me_route_drop_queue_full_high: AtomicU64,
me_d2c_batches_total: AtomicU64,
me_d2c_batch_frames_total: AtomicU64,
me_d2c_batch_bytes_total: AtomicU64,
me_d2c_flush_reason_queue_drain_total: AtomicU64,
me_d2c_flush_reason_batch_frames_total: AtomicU64,
me_d2c_flush_reason_batch_bytes_total: AtomicU64,
me_d2c_flush_reason_max_delay_total: AtomicU64,
me_d2c_flush_reason_ack_immediate_total: AtomicU64,
me_d2c_flush_reason_close_total: AtomicU64,
me_d2c_data_frames_total: AtomicU64,
me_d2c_ack_frames_total: AtomicU64,
me_d2c_payload_bytes_total: AtomicU64,
me_d2c_write_mode_coalesced_total: AtomicU64,
me_d2c_write_mode_split_total: AtomicU64,
me_d2c_quota_reject_pre_write_total: AtomicU64,
me_d2c_quota_reject_post_write_total: AtomicU64,
me_d2c_frame_buf_shrink_total: AtomicU64,
me_d2c_frame_buf_shrink_bytes_total: AtomicU64,
me_d2c_batch_frames_bucket_1: AtomicU64,
me_d2c_batch_frames_bucket_2_4: AtomicU64,
me_d2c_batch_frames_bucket_5_8: AtomicU64,
me_d2c_batch_frames_bucket_9_16: AtomicU64,
me_d2c_batch_frames_bucket_17_32: AtomicU64,
me_d2c_batch_frames_bucket_gt_32: AtomicU64,
me_d2c_batch_bytes_bucket_0_1k: AtomicU64,
me_d2c_batch_bytes_bucket_1k_4k: AtomicU64,
me_d2c_batch_bytes_bucket_4k_16k: AtomicU64,
me_d2c_batch_bytes_bucket_16k_64k: AtomicU64,
me_d2c_batch_bytes_bucket_64k_128k: AtomicU64,
me_d2c_batch_bytes_bucket_gt_128k: AtomicU64,
me_d2c_flush_duration_us_bucket_0_50: AtomicU64,
me_d2c_flush_duration_us_bucket_51_200: AtomicU64,
me_d2c_flush_duration_us_bucket_201_1000: AtomicU64,
me_d2c_flush_duration_us_bucket_1001_5000: AtomicU64,
me_d2c_flush_duration_us_bucket_5001_20000: AtomicU64,
me_d2c_flush_duration_us_bucket_gt_20000: AtomicU64,
me_d2c_batch_timeout_armed_total: AtomicU64,
me_d2c_batch_timeout_fired_total: AtomicU64,
me_writer_pick_sorted_rr_success_try_total: AtomicU64,
me_writer_pick_sorted_rr_success_fallback_total: AtomicU64,
me_writer_pick_sorted_rr_full_total: AtomicU64,
@ -594,6 +654,215 @@ impl Stats {
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_d2c_batches_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_d2c_batches_total.fetch_add(1, Ordering::Relaxed);
}
}
pub fn add_me_d2c_batch_frames_total(&self, frames: u64) {
if self.telemetry_me_allows_normal() {
self.me_d2c_batch_frames_total
.fetch_add(frames, Ordering::Relaxed);
}
}
pub fn add_me_d2c_batch_bytes_total(&self, bytes: u64) {
if self.telemetry_me_allows_normal() {
self.me_d2c_batch_bytes_total
.fetch_add(bytes, Ordering::Relaxed);
}
}
pub fn increment_me_d2c_flush_reason(&self, reason: MeD2cFlushReason) {
if !self.telemetry_me_allows_normal() {
return;
}
match reason {
MeD2cFlushReason::QueueDrain => {
self.me_d2c_flush_reason_queue_drain_total
.fetch_add(1, Ordering::Relaxed);
}
MeD2cFlushReason::BatchFrames => {
self.me_d2c_flush_reason_batch_frames_total
.fetch_add(1, Ordering::Relaxed);
}
MeD2cFlushReason::BatchBytes => {
self.me_d2c_flush_reason_batch_bytes_total
.fetch_add(1, Ordering::Relaxed);
}
MeD2cFlushReason::MaxDelay => {
self.me_d2c_flush_reason_max_delay_total
.fetch_add(1, Ordering::Relaxed);
}
MeD2cFlushReason::AckImmediate => {
self.me_d2c_flush_reason_ack_immediate_total
.fetch_add(1, Ordering::Relaxed);
}
MeD2cFlushReason::Close => {
self.me_d2c_flush_reason_close_total
.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn increment_me_d2c_data_frames_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_d2c_data_frames_total.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_d2c_ack_frames_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_d2c_ack_frames_total.fetch_add(1, Ordering::Relaxed);
}
}
pub fn add_me_d2c_payload_bytes_total(&self, bytes: u64) {
if self.telemetry_me_allows_normal() {
self.me_d2c_payload_bytes_total
.fetch_add(bytes, Ordering::Relaxed);
}
}
pub fn increment_me_d2c_write_mode(&self, mode: MeD2cWriteMode) {
if !self.telemetry_me_allows_normal() {
return;
}
match mode {
MeD2cWriteMode::Coalesced => {
self.me_d2c_write_mode_coalesced_total
.fetch_add(1, Ordering::Relaxed);
}
MeD2cWriteMode::Split => {
self.me_d2c_write_mode_split_total
.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn increment_me_d2c_quota_reject_total(&self, stage: MeD2cQuotaRejectStage) {
if !self.telemetry_me_allows_normal() {
return;
}
match stage {
MeD2cQuotaRejectStage::PreWrite => {
self.me_d2c_quota_reject_pre_write_total
.fetch_add(1, Ordering::Relaxed);
}
MeD2cQuotaRejectStage::PostWrite => {
self.me_d2c_quota_reject_post_write_total
.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn observe_me_d2c_frame_buf_shrink(&self, bytes_freed: u64) {
if !self.telemetry_me_allows_normal() {
return;
}
self.me_d2c_frame_buf_shrink_total
.fetch_add(1, Ordering::Relaxed);
self.me_d2c_frame_buf_shrink_bytes_total
.fetch_add(bytes_freed, Ordering::Relaxed);
}
pub fn observe_me_d2c_batch_frames(&self, frames: u64) {
if !self.telemetry_me_allows_debug() {
return;
}
match frames {
0 => {}
1 => {
self.me_d2c_batch_frames_bucket_1
.fetch_add(1, Ordering::Relaxed);
}
2..=4 => {
self.me_d2c_batch_frames_bucket_2_4
.fetch_add(1, Ordering::Relaxed);
}
5..=8 => {
self.me_d2c_batch_frames_bucket_5_8
.fetch_add(1, Ordering::Relaxed);
}
9..=16 => {
self.me_d2c_batch_frames_bucket_9_16
.fetch_add(1, Ordering::Relaxed);
}
17..=32 => {
self.me_d2c_batch_frames_bucket_17_32
.fetch_add(1, Ordering::Relaxed);
}
_ => {
self.me_d2c_batch_frames_bucket_gt_32
.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn observe_me_d2c_batch_bytes(&self, bytes: u64) {
if !self.telemetry_me_allows_debug() {
return;
}
match bytes {
0..=1024 => {
self.me_d2c_batch_bytes_bucket_0_1k
.fetch_add(1, Ordering::Relaxed);
}
1025..=4096 => {
self.me_d2c_batch_bytes_bucket_1k_4k
.fetch_add(1, Ordering::Relaxed);
}
4097..=16_384 => {
self.me_d2c_batch_bytes_bucket_4k_16k
.fetch_add(1, Ordering::Relaxed);
}
16_385..=65_536 => {
self.me_d2c_batch_bytes_bucket_16k_64k
.fetch_add(1, Ordering::Relaxed);
}
65_537..=131_072 => {
self.me_d2c_batch_bytes_bucket_64k_128k
.fetch_add(1, Ordering::Relaxed);
}
_ => {
self.me_d2c_batch_bytes_bucket_gt_128k
.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn observe_me_d2c_flush_duration_us(&self, duration_us: u64) {
if !self.telemetry_me_allows_debug() {
return;
}
match duration_us {
0..=50 => {
self.me_d2c_flush_duration_us_bucket_0_50
.fetch_add(1, Ordering::Relaxed);
}
51..=200 => {
self.me_d2c_flush_duration_us_bucket_51_200
.fetch_add(1, Ordering::Relaxed);
}
201..=1000 => {
self.me_d2c_flush_duration_us_bucket_201_1000
.fetch_add(1, Ordering::Relaxed);
}
1001..=5000 => {
self.me_d2c_flush_duration_us_bucket_1001_5000
.fetch_add(1, Ordering::Relaxed);
}
5001..=20_000 => {
self.me_d2c_flush_duration_us_bucket_5001_20000
.fetch_add(1, Ordering::Relaxed);
}
_ => {
self.me_d2c_flush_duration_us_bucket_gt_20000
.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn increment_me_d2c_batch_timeout_armed_total(&self) {
if self.telemetry_me_allows_debug() {
self.me_d2c_batch_timeout_armed_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_d2c_batch_timeout_fired_total(&self) {
if self.telemetry_me_allows_debug() {
self.me_d2c_batch_timeout_fired_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_writer_pick_success_try_total(&self, mode: MeWriterPickMode) {
if !self.telemetry_me_allows_normal() {
return;
@ -1229,6 +1498,142 @@ impl Stats {
pub fn get_me_route_drop_queue_full_high(&self) -> u64 {
self.me_route_drop_queue_full_high.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batches_total(&self) -> u64 {
self.me_d2c_batches_total.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_frames_total(&self) -> u64 {
self.me_d2c_batch_frames_total.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_bytes_total(&self) -> u64 {
self.me_d2c_batch_bytes_total.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_reason_queue_drain_total(&self) -> u64 {
self.me_d2c_flush_reason_queue_drain_total
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_reason_batch_frames_total(&self) -> u64 {
self.me_d2c_flush_reason_batch_frames_total
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_reason_batch_bytes_total(&self) -> u64 {
self.me_d2c_flush_reason_batch_bytes_total
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_reason_max_delay_total(&self) -> u64 {
self.me_d2c_flush_reason_max_delay_total
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_reason_ack_immediate_total(&self) -> u64 {
self.me_d2c_flush_reason_ack_immediate_total
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_reason_close_total(&self) -> u64 {
self.me_d2c_flush_reason_close_total.load(Ordering::Relaxed)
}
pub fn get_me_d2c_data_frames_total(&self) -> u64 {
self.me_d2c_data_frames_total.load(Ordering::Relaxed)
}
pub fn get_me_d2c_ack_frames_total(&self) -> u64 {
self.me_d2c_ack_frames_total.load(Ordering::Relaxed)
}
pub fn get_me_d2c_payload_bytes_total(&self) -> u64 {
self.me_d2c_payload_bytes_total.load(Ordering::Relaxed)
}
pub fn get_me_d2c_write_mode_coalesced_total(&self) -> u64 {
self.me_d2c_write_mode_coalesced_total
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_write_mode_split_total(&self) -> u64 {
self.me_d2c_write_mode_split_total.load(Ordering::Relaxed)
}
pub fn get_me_d2c_quota_reject_pre_write_total(&self) -> u64 {
self.me_d2c_quota_reject_pre_write_total
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_quota_reject_post_write_total(&self) -> u64 {
self.me_d2c_quota_reject_post_write_total
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_frame_buf_shrink_total(&self) -> u64 {
self.me_d2c_frame_buf_shrink_total.load(Ordering::Relaxed)
}
pub fn get_me_d2c_frame_buf_shrink_bytes_total(&self) -> u64 {
self.me_d2c_frame_buf_shrink_bytes_total
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_frames_bucket_1(&self) -> u64 {
self.me_d2c_batch_frames_bucket_1.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_frames_bucket_2_4(&self) -> u64 {
self.me_d2c_batch_frames_bucket_2_4.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_frames_bucket_5_8(&self) -> u64 {
self.me_d2c_batch_frames_bucket_5_8.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_frames_bucket_9_16(&self) -> u64 {
self.me_d2c_batch_frames_bucket_9_16.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_frames_bucket_17_32(&self) -> u64 {
self.me_d2c_batch_frames_bucket_17_32
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_frames_bucket_gt_32(&self) -> u64 {
self.me_d2c_batch_frames_bucket_gt_32
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_bytes_bucket_0_1k(&self) -> u64 {
self.me_d2c_batch_bytes_bucket_0_1k.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_bytes_bucket_1k_4k(&self) -> u64 {
self.me_d2c_batch_bytes_bucket_1k_4k.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_bytes_bucket_4k_16k(&self) -> u64 {
self.me_d2c_batch_bytes_bucket_4k_16k.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_bytes_bucket_16k_64k(&self) -> u64 {
self.me_d2c_batch_bytes_bucket_16k_64k
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_bytes_bucket_64k_128k(&self) -> u64 {
self.me_d2c_batch_bytes_bucket_64k_128k
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_bytes_bucket_gt_128k(&self) -> u64 {
self.me_d2c_batch_bytes_bucket_gt_128k
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_duration_us_bucket_0_50(&self) -> u64 {
self.me_d2c_flush_duration_us_bucket_0_50
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_duration_us_bucket_51_200(&self) -> u64 {
self.me_d2c_flush_duration_us_bucket_51_200
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_duration_us_bucket_201_1000(&self) -> u64 {
self.me_d2c_flush_duration_us_bucket_201_1000
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_duration_us_bucket_1001_5000(&self) -> u64 {
self.me_d2c_flush_duration_us_bucket_1001_5000
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_duration_us_bucket_5001_20000(&self) -> u64 {
self.me_d2c_flush_duration_us_bucket_5001_20000
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_flush_duration_us_bucket_gt_20000(&self) -> u64 {
self.me_d2c_flush_duration_us_bucket_gt_20000
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_timeout_armed_total(&self) -> u64 {
self.me_d2c_batch_timeout_armed_total
.load(Ordering::Relaxed)
}
pub fn get_me_d2c_batch_timeout_fired_total(&self) -> u64 {
self.me_d2c_batch_timeout_fired_total
.load(Ordering::Relaxed)
}
pub fn get_me_writer_pick_sorted_rr_success_try_total(&self) -> u64 {
self.me_writer_pick_sorted_rr_success_try_total
.load(Ordering::Relaxed)
@ -1898,9 +2303,83 @@ mod tests {
stats.increment_me_crc_mismatch();
stats.increment_me_keepalive_sent();
stats.increment_me_route_drop_queue_full();
stats.increment_me_d2c_batches_total();
stats.add_me_d2c_batch_frames_total(4);
stats.add_me_d2c_batch_bytes_total(4096);
stats.increment_me_d2c_flush_reason(MeD2cFlushReason::BatchBytes);
stats.increment_me_d2c_write_mode(MeD2cWriteMode::Coalesced);
stats.increment_me_d2c_quota_reject_total(MeD2cQuotaRejectStage::PreWrite);
stats.observe_me_d2c_frame_buf_shrink(1024);
stats.observe_me_d2c_batch_frames(4);
stats.observe_me_d2c_batch_bytes(4096);
stats.observe_me_d2c_flush_duration_us(120);
stats.increment_me_d2c_batch_timeout_armed_total();
stats.increment_me_d2c_batch_timeout_fired_total();
assert_eq!(stats.get_me_crc_mismatch(), 0);
assert_eq!(stats.get_me_keepalive_sent(), 0);
assert_eq!(stats.get_me_route_drop_queue_full(), 0);
assert_eq!(stats.get_me_d2c_batches_total(), 0);
assert_eq!(stats.get_me_d2c_flush_reason_batch_bytes_total(), 0);
assert_eq!(stats.get_me_d2c_write_mode_coalesced_total(), 0);
assert_eq!(stats.get_me_d2c_quota_reject_pre_write_total(), 0);
assert_eq!(stats.get_me_d2c_frame_buf_shrink_total(), 0);
assert_eq!(stats.get_me_d2c_batch_frames_bucket_2_4(), 0);
assert_eq!(stats.get_me_d2c_batch_bytes_bucket_1k_4k(), 0);
assert_eq!(stats.get_me_d2c_flush_duration_us_bucket_51_200(), 0);
assert_eq!(stats.get_me_d2c_batch_timeout_armed_total(), 0);
assert_eq!(stats.get_me_d2c_batch_timeout_fired_total(), 0);
}
#[test]
fn test_telemetry_policy_me_normal_blocks_d2c_debug_metrics() {
let stats = Stats::new();
stats.apply_telemetry_policy(TelemetryPolicy {
core_enabled: true,
user_enabled: true,
me_level: MeTelemetryLevel::Normal,
});
stats.increment_me_d2c_batches_total();
stats.add_me_d2c_batch_frames_total(2);
stats.add_me_d2c_batch_bytes_total(2048);
stats.increment_me_d2c_flush_reason(MeD2cFlushReason::QueueDrain);
stats.observe_me_d2c_batch_frames(2);
stats.observe_me_d2c_batch_bytes(2048);
stats.observe_me_d2c_flush_duration_us(100);
stats.increment_me_d2c_batch_timeout_armed_total();
stats.increment_me_d2c_batch_timeout_fired_total();
assert_eq!(stats.get_me_d2c_batches_total(), 1);
assert_eq!(stats.get_me_d2c_batch_frames_total(), 2);
assert_eq!(stats.get_me_d2c_batch_bytes_total(), 2048);
assert_eq!(stats.get_me_d2c_flush_reason_queue_drain_total(), 1);
assert_eq!(stats.get_me_d2c_batch_frames_bucket_2_4(), 0);
assert_eq!(stats.get_me_d2c_batch_bytes_bucket_1k_4k(), 0);
assert_eq!(stats.get_me_d2c_flush_duration_us_bucket_51_200(), 0);
assert_eq!(stats.get_me_d2c_batch_timeout_armed_total(), 0);
assert_eq!(stats.get_me_d2c_batch_timeout_fired_total(), 0);
}
#[test]
fn test_telemetry_policy_me_debug_enables_d2c_debug_metrics() {
let stats = Stats::new();
stats.apply_telemetry_policy(TelemetryPolicy {
core_enabled: true,
user_enabled: true,
me_level: MeTelemetryLevel::Debug,
});
stats.observe_me_d2c_batch_frames(7);
stats.observe_me_d2c_batch_bytes(70_000);
stats.observe_me_d2c_flush_duration_us(1400);
stats.increment_me_d2c_batch_timeout_armed_total();
stats.increment_me_d2c_batch_timeout_fired_total();
assert_eq!(stats.get_me_d2c_batch_frames_bucket_5_8(), 1);
assert_eq!(stats.get_me_d2c_batch_bytes_bucket_64k_128k(), 1);
assert_eq!(stats.get_me_d2c_flush_duration_us_bucket_1001_5000(), 1);
assert_eq!(stats.get_me_d2c_batch_timeout_armed_total(), 1);
assert_eq!(stats.get_me_d2c_batch_timeout_fired_total(), 1);
}
#[test]

View File

@ -126,14 +126,10 @@ pub(crate) async fn reader_loop(
let data = body.slice(12..);
trace!(cid, flags, len = data.len(), "RPC_PROXY_ANS");
let data_wait_ms = reader_route_data_wait_ms.load(Ordering::Relaxed);
let routed = if data_wait_ms == 0 {
reg.route_nowait(cid, MeResponse::Data { flags, data })
.await
} else {
reg.route_with_timeout(cid, MeResponse::Data { flags, data }, data_wait_ms)
.await
};
let route_wait_ms = reader_route_data_wait_ms.load(Ordering::Relaxed);
let routed = reg
.route_with_timeout(cid, MeResponse::Data { flags, data }, route_wait_ms)
.await;
if !matches!(routed, RouteResult::Routed) {
match routed {
RouteResult::NoConn => stats.increment_me_route_drop_no_conn(),