mirror of https://github.com/telemt/telemt.git
DC -> Client Runtime in Metrics and API
This commit is contained in:
parent
0461bc65c6
commit
7b570be5b3
|
|
@ -174,6 +174,24 @@ pub(super) struct ZeroMiddleProxyData {
|
||||||
pub(super) route_drop_queue_full_total: u64,
|
pub(super) route_drop_queue_full_total: u64,
|
||||||
pub(super) route_drop_queue_full_base_total: u64,
|
pub(super) route_drop_queue_full_base_total: u64,
|
||||||
pub(super) route_drop_queue_full_high_total: u64,
|
pub(super) route_drop_queue_full_high_total: u64,
|
||||||
|
pub(super) d2c_batches_total: u64,
|
||||||
|
pub(super) d2c_batch_frames_total: u64,
|
||||||
|
pub(super) d2c_batch_bytes_total: u64,
|
||||||
|
pub(super) d2c_flush_reason_queue_drain_total: u64,
|
||||||
|
pub(super) d2c_flush_reason_batch_frames_total: u64,
|
||||||
|
pub(super) d2c_flush_reason_batch_bytes_total: u64,
|
||||||
|
pub(super) d2c_flush_reason_max_delay_total: u64,
|
||||||
|
pub(super) d2c_flush_reason_ack_immediate_total: u64,
|
||||||
|
pub(super) d2c_flush_reason_close_total: u64,
|
||||||
|
pub(super) d2c_data_frames_total: u64,
|
||||||
|
pub(super) d2c_ack_frames_total: u64,
|
||||||
|
pub(super) d2c_payload_bytes_total: u64,
|
||||||
|
pub(super) d2c_write_mode_coalesced_total: u64,
|
||||||
|
pub(super) d2c_write_mode_split_total: u64,
|
||||||
|
pub(super) d2c_quota_reject_pre_write_total: u64,
|
||||||
|
pub(super) d2c_quota_reject_post_write_total: u64,
|
||||||
|
pub(super) d2c_frame_buf_shrink_total: u64,
|
||||||
|
pub(super) d2c_frame_buf_shrink_bytes_total: u64,
|
||||||
pub(super) socks_kdf_strict_reject_total: u64,
|
pub(super) socks_kdf_strict_reject_total: u64,
|
||||||
pub(super) socks_kdf_compat_fallback_total: u64,
|
pub(super) socks_kdf_compat_fallback_total: u64,
|
||||||
pub(super) endpoint_quarantine_total: u64,
|
pub(super) endpoint_quarantine_total: u64,
|
||||||
|
|
|
||||||
|
|
@ -68,6 +68,25 @@ pub(super) fn build_zero_all_data(stats: &Stats, configured_users: usize) -> Zer
|
||||||
route_drop_queue_full_total: stats.get_me_route_drop_queue_full(),
|
route_drop_queue_full_total: stats.get_me_route_drop_queue_full(),
|
||||||
route_drop_queue_full_base_total: stats.get_me_route_drop_queue_full_base(),
|
route_drop_queue_full_base_total: stats.get_me_route_drop_queue_full_base(),
|
||||||
route_drop_queue_full_high_total: stats.get_me_route_drop_queue_full_high(),
|
route_drop_queue_full_high_total: stats.get_me_route_drop_queue_full_high(),
|
||||||
|
d2c_batches_total: stats.get_me_d2c_batches_total(),
|
||||||
|
d2c_batch_frames_total: stats.get_me_d2c_batch_frames_total(),
|
||||||
|
d2c_batch_bytes_total: stats.get_me_d2c_batch_bytes_total(),
|
||||||
|
d2c_flush_reason_queue_drain_total: stats.get_me_d2c_flush_reason_queue_drain_total(),
|
||||||
|
d2c_flush_reason_batch_frames_total: stats.get_me_d2c_flush_reason_batch_frames_total(),
|
||||||
|
d2c_flush_reason_batch_bytes_total: stats.get_me_d2c_flush_reason_batch_bytes_total(),
|
||||||
|
d2c_flush_reason_max_delay_total: stats.get_me_d2c_flush_reason_max_delay_total(),
|
||||||
|
d2c_flush_reason_ack_immediate_total: stats
|
||||||
|
.get_me_d2c_flush_reason_ack_immediate_total(),
|
||||||
|
d2c_flush_reason_close_total: stats.get_me_d2c_flush_reason_close_total(),
|
||||||
|
d2c_data_frames_total: stats.get_me_d2c_data_frames_total(),
|
||||||
|
d2c_ack_frames_total: stats.get_me_d2c_ack_frames_total(),
|
||||||
|
d2c_payload_bytes_total: stats.get_me_d2c_payload_bytes_total(),
|
||||||
|
d2c_write_mode_coalesced_total: stats.get_me_d2c_write_mode_coalesced_total(),
|
||||||
|
d2c_write_mode_split_total: stats.get_me_d2c_write_mode_split_total(),
|
||||||
|
d2c_quota_reject_pre_write_total: stats.get_me_d2c_quota_reject_pre_write_total(),
|
||||||
|
d2c_quota_reject_post_write_total: stats.get_me_d2c_quota_reject_post_write_total(),
|
||||||
|
d2c_frame_buf_shrink_total: stats.get_me_d2c_frame_buf_shrink_total(),
|
||||||
|
d2c_frame_buf_shrink_bytes_total: stats.get_me_d2c_frame_buf_shrink_bytes_total(),
|
||||||
socks_kdf_strict_reject_total: stats.get_me_socks_kdf_strict_reject(),
|
socks_kdf_strict_reject_total: stats.get_me_socks_kdf_strict_reject(),
|
||||||
socks_kdf_compat_fallback_total: stats.get_me_socks_kdf_compat_fallback(),
|
socks_kdf_compat_fallback_total: stats.get_me_socks_kdf_compat_fallback(),
|
||||||
endpoint_quarantine_total: stats.get_me_endpoint_quarantine_total(),
|
endpoint_quarantine_total: stats.get_me_endpoint_quarantine_total(),
|
||||||
|
|
|
||||||
482
src/metrics.rs
482
src/metrics.rs
|
|
@ -935,6 +935,462 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_d2c_batches_total Total DC->Client flush batches"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_d2c_batches_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_batches_total {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_d2c_batches_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_d2c_batch_frames_total Total DC->Client frames flushed in batches"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_d2c_batch_frames_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_batch_frames_total {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_d2c_batch_frames_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_d2c_batch_bytes_total Total DC->Client bytes flushed in batches"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_d2c_batch_bytes_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_batch_bytes_total {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_d2c_batch_bytes_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_d2c_flush_reason_total DC->Client flush reasons"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_d2c_flush_reason_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_flush_reason_total{{reason=\"queue_drain\"}} {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_d2c_flush_reason_queue_drain_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_flush_reason_total{{reason=\"batch_frames\"}} {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_d2c_flush_reason_batch_frames_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_flush_reason_total{{reason=\"batch_bytes\"}} {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_d2c_flush_reason_batch_bytes_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_flush_reason_total{{reason=\"max_delay\"}} {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_d2c_flush_reason_max_delay_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_flush_reason_total{{reason=\"ack_immediate\"}} {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_d2c_flush_reason_ack_immediate_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_flush_reason_total{{reason=\"close\"}} {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_d2c_flush_reason_close_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_d2c_data_frames_total DC->Client data frames"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_d2c_data_frames_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_data_frames_total {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_d2c_data_frames_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_d2c_ack_frames_total DC->Client quick-ack frames"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_d2c_ack_frames_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_ack_frames_total {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_d2c_ack_frames_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_d2c_payload_bytes_total DC->Client payload bytes before transport framing"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_d2c_payload_bytes_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_payload_bytes_total {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_d2c_payload_bytes_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_d2c_write_mode_total DC->Client writer mode selection"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_d2c_write_mode_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_write_mode_total{{mode=\"coalesced\"}} {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_d2c_write_mode_coalesced_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_write_mode_total{{mode=\"split\"}} {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_d2c_write_mode_split_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_d2c_quota_reject_total DC->Client quota rejects"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_d2c_quota_reject_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_quota_reject_total{{stage=\"pre_write\"}} {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_d2c_quota_reject_pre_write_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_quota_reject_total{{stage=\"post_write\"}} {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_d2c_quota_reject_post_write_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_d2c_frame_buf_shrink_total DC->Client reusable frame buffer shrink events"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_d2c_frame_buf_shrink_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_frame_buf_shrink_total {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_d2c_frame_buf_shrink_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_d2c_frame_buf_shrink_bytes_total DC->Client reusable frame buffer bytes released"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# TYPE telemt_me_d2c_frame_buf_shrink_bytes_total counter"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_frame_buf_shrink_bytes_total {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_d2c_frame_buf_shrink_bytes_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_d2c_batch_frames_bucket_total DC->Client batch frame count buckets"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# TYPE telemt_me_d2c_batch_frames_bucket_total counter"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_batch_frames_bucket_total{{bucket=\"1\"}} {}",
|
||||||
|
if me_allows_debug {
|
||||||
|
stats.get_me_d2c_batch_frames_bucket_1()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_batch_frames_bucket_total{{bucket=\"2_4\"}} {}",
|
||||||
|
if me_allows_debug {
|
||||||
|
stats.get_me_d2c_batch_frames_bucket_2_4()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_batch_frames_bucket_total{{bucket=\"5_8\"}} {}",
|
||||||
|
if me_allows_debug {
|
||||||
|
stats.get_me_d2c_batch_frames_bucket_5_8()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_batch_frames_bucket_total{{bucket=\"9_16\"}} {}",
|
||||||
|
if me_allows_debug {
|
||||||
|
stats.get_me_d2c_batch_frames_bucket_9_16()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_batch_frames_bucket_total{{bucket=\"17_32\"}} {}",
|
||||||
|
if me_allows_debug {
|
||||||
|
stats.get_me_d2c_batch_frames_bucket_17_32()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_batch_frames_bucket_total{{bucket=\"gt_32\"}} {}",
|
||||||
|
if me_allows_debug {
|
||||||
|
stats.get_me_d2c_batch_frames_bucket_gt_32()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_d2c_batch_bytes_bucket_total DC->Client batch byte size buckets"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# TYPE telemt_me_d2c_batch_bytes_bucket_total counter"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_batch_bytes_bucket_total{{bucket=\"0_1k\"}} {}",
|
||||||
|
if me_allows_debug {
|
||||||
|
stats.get_me_d2c_batch_bytes_bucket_0_1k()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_batch_bytes_bucket_total{{bucket=\"1k_4k\"}} {}",
|
||||||
|
if me_allows_debug {
|
||||||
|
stats.get_me_d2c_batch_bytes_bucket_1k_4k()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_batch_bytes_bucket_total{{bucket=\"4k_16k\"}} {}",
|
||||||
|
if me_allows_debug {
|
||||||
|
stats.get_me_d2c_batch_bytes_bucket_4k_16k()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_batch_bytes_bucket_total{{bucket=\"16k_64k\"}} {}",
|
||||||
|
if me_allows_debug {
|
||||||
|
stats.get_me_d2c_batch_bytes_bucket_16k_64k()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_batch_bytes_bucket_total{{bucket=\"64k_128k\"}} {}",
|
||||||
|
if me_allows_debug {
|
||||||
|
stats.get_me_d2c_batch_bytes_bucket_64k_128k()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_batch_bytes_bucket_total{{bucket=\"gt_128k\"}} {}",
|
||||||
|
if me_allows_debug {
|
||||||
|
stats.get_me_d2c_batch_bytes_bucket_gt_128k()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_d2c_flush_duration_us_bucket_total DC->Client flush duration buckets"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# TYPE telemt_me_d2c_flush_duration_us_bucket_total counter"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_flush_duration_us_bucket_total{{bucket=\"0_50\"}} {}",
|
||||||
|
if me_allows_debug {
|
||||||
|
stats.get_me_d2c_flush_duration_us_bucket_0_50()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_flush_duration_us_bucket_total{{bucket=\"51_200\"}} {}",
|
||||||
|
if me_allows_debug {
|
||||||
|
stats.get_me_d2c_flush_duration_us_bucket_51_200()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_flush_duration_us_bucket_total{{bucket=\"201_1000\"}} {}",
|
||||||
|
if me_allows_debug {
|
||||||
|
stats.get_me_d2c_flush_duration_us_bucket_201_1000()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_flush_duration_us_bucket_total{{bucket=\"1001_5000\"}} {}",
|
||||||
|
if me_allows_debug {
|
||||||
|
stats.get_me_d2c_flush_duration_us_bucket_1001_5000()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_flush_duration_us_bucket_total{{bucket=\"5001_20000\"}} {}",
|
||||||
|
if me_allows_debug {
|
||||||
|
stats.get_me_d2c_flush_duration_us_bucket_5001_20000()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_flush_duration_us_bucket_total{{bucket=\"gt_20000\"}} {}",
|
||||||
|
if me_allows_debug {
|
||||||
|
stats.get_me_d2c_flush_duration_us_bucket_gt_20000()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_d2c_batch_timeout_armed_total DC->Client max-delay timer armed events"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# TYPE telemt_me_d2c_batch_timeout_armed_total counter"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_batch_timeout_armed_total {}",
|
||||||
|
if me_allows_debug {
|
||||||
|
stats.get_me_d2c_batch_timeout_armed_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_d2c_batch_timeout_fired_total DC->Client max-delay timer fired events"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# TYPE telemt_me_d2c_batch_timeout_fired_total counter"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_d2c_batch_timeout_fired_total {}",
|
||||||
|
if me_allows_debug {
|
||||||
|
stats.get_me_d2c_batch_timeout_fired_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
let _ = writeln!(
|
let _ = writeln!(
|
||||||
out,
|
out,
|
||||||
"# HELP telemt_me_writer_pick_total ME writer-pick outcomes by mode and result"
|
"# HELP telemt_me_writer_pick_total ME writer-pick outcomes by mode and result"
|
||||||
|
|
@ -2145,6 +2601,16 @@ mod tests {
|
||||||
stats.increment_relay_idle_hard_close_total();
|
stats.increment_relay_idle_hard_close_total();
|
||||||
stats.increment_relay_pressure_evict_total();
|
stats.increment_relay_pressure_evict_total();
|
||||||
stats.increment_relay_protocol_desync_close_total();
|
stats.increment_relay_protocol_desync_close_total();
|
||||||
|
stats.increment_me_d2c_batches_total();
|
||||||
|
stats.add_me_d2c_batch_frames_total(3);
|
||||||
|
stats.add_me_d2c_batch_bytes_total(2048);
|
||||||
|
stats.increment_me_d2c_flush_reason(crate::stats::MeD2cFlushReason::AckImmediate);
|
||||||
|
stats.increment_me_d2c_data_frames_total();
|
||||||
|
stats.increment_me_d2c_ack_frames_total();
|
||||||
|
stats.add_me_d2c_payload_bytes_total(1800);
|
||||||
|
stats.increment_me_d2c_write_mode(crate::stats::MeD2cWriteMode::Coalesced);
|
||||||
|
stats.increment_me_d2c_quota_reject_total(crate::stats::MeD2cQuotaRejectStage::PostWrite);
|
||||||
|
stats.observe_me_d2c_frame_buf_shrink(4096);
|
||||||
stats.increment_user_connects("alice");
|
stats.increment_user_connects("alice");
|
||||||
stats.increment_user_curr_connects("alice");
|
stats.increment_user_curr_connects("alice");
|
||||||
stats.add_user_octets_from("alice", 1024);
|
stats.add_user_octets_from("alice", 1024);
|
||||||
|
|
@ -2184,6 +2650,17 @@ mod tests {
|
||||||
assert!(output.contains("telemt_relay_idle_hard_close_total 1"));
|
assert!(output.contains("telemt_relay_idle_hard_close_total 1"));
|
||||||
assert!(output.contains("telemt_relay_pressure_evict_total 1"));
|
assert!(output.contains("telemt_relay_pressure_evict_total 1"));
|
||||||
assert!(output.contains("telemt_relay_protocol_desync_close_total 1"));
|
assert!(output.contains("telemt_relay_protocol_desync_close_total 1"));
|
||||||
|
assert!(output.contains("telemt_me_d2c_batches_total 1"));
|
||||||
|
assert!(output.contains("telemt_me_d2c_batch_frames_total 3"));
|
||||||
|
assert!(output.contains("telemt_me_d2c_batch_bytes_total 2048"));
|
||||||
|
assert!(output.contains("telemt_me_d2c_flush_reason_total{reason=\"ack_immediate\"} 1"));
|
||||||
|
assert!(output.contains("telemt_me_d2c_data_frames_total 1"));
|
||||||
|
assert!(output.contains("telemt_me_d2c_ack_frames_total 1"));
|
||||||
|
assert!(output.contains("telemt_me_d2c_payload_bytes_total 1800"));
|
||||||
|
assert!(output.contains("telemt_me_d2c_write_mode_total{mode=\"coalesced\"} 1"));
|
||||||
|
assert!(output.contains("telemt_me_d2c_quota_reject_total{stage=\"post_write\"} 1"));
|
||||||
|
assert!(output.contains("telemt_me_d2c_frame_buf_shrink_total 1"));
|
||||||
|
assert!(output.contains("telemt_me_d2c_frame_buf_shrink_bytes_total 4096"));
|
||||||
assert!(output.contains("telemt_user_connections_total{user=\"alice\"} 1"));
|
assert!(output.contains("telemt_user_connections_total{user=\"alice\"} 1"));
|
||||||
assert!(output.contains("telemt_user_connections_current{user=\"alice\"} 1"));
|
assert!(output.contains("telemt_user_connections_current{user=\"alice\"} 1"));
|
||||||
assert!(output.contains("telemt_user_octets_from_client{user=\"alice\"} 1024"));
|
assert!(output.contains("telemt_user_octets_from_client{user=\"alice\"} 1024"));
|
||||||
|
|
@ -2245,6 +2722,11 @@ mod tests {
|
||||||
assert!(output.contains("# TYPE telemt_relay_idle_hard_close_total counter"));
|
assert!(output.contains("# TYPE telemt_relay_idle_hard_close_total counter"));
|
||||||
assert!(output.contains("# TYPE telemt_relay_pressure_evict_total counter"));
|
assert!(output.contains("# TYPE telemt_relay_pressure_evict_total counter"));
|
||||||
assert!(output.contains("# TYPE telemt_relay_protocol_desync_close_total counter"));
|
assert!(output.contains("# TYPE telemt_relay_protocol_desync_close_total counter"));
|
||||||
|
assert!(output.contains("# TYPE telemt_me_d2c_batches_total counter"));
|
||||||
|
assert!(output.contains("# TYPE telemt_me_d2c_flush_reason_total counter"));
|
||||||
|
assert!(output.contains("# TYPE telemt_me_d2c_write_mode_total counter"));
|
||||||
|
assert!(output.contains("# TYPE telemt_me_d2c_batch_frames_bucket_total counter"));
|
||||||
|
assert!(output.contains("# TYPE telemt_me_d2c_flush_duration_us_bucket_total counter"));
|
||||||
assert!(output.contains("# TYPE telemt_me_writer_removed_total counter"));
|
assert!(output.contains("# TYPE telemt_me_writer_removed_total counter"));
|
||||||
assert!(
|
assert!(
|
||||||
output
|
output
|
||||||
|
|
|
||||||
|
|
@ -21,7 +21,7 @@ use crate::proxy::route_mode::{
|
||||||
ROUTE_SWITCH_ERROR_MSG, RelayRouteMode, RouteCutoverState, affected_cutover_state,
|
ROUTE_SWITCH_ERROR_MSG, RelayRouteMode, RouteCutoverState, affected_cutover_state,
|
||||||
cutover_stagger_delay,
|
cutover_stagger_delay,
|
||||||
};
|
};
|
||||||
use crate::stats::Stats;
|
use crate::stats::{MeD2cFlushReason, MeD2cQuotaRejectStage, MeD2cWriteMode, Stats};
|
||||||
use crate::stream::{BufferPool, CryptoReader, CryptoWriter, PooledBuffer};
|
use crate::stream::{BufferPool, CryptoReader, CryptoWriter, PooledBuffer};
|
||||||
use crate::transport::middle_proxy::{MePool, MeResponse, proto_flags_for_tag};
|
use crate::transport::middle_proxy::{MePool, MeResponse, proto_flags_for_tag};
|
||||||
|
|
||||||
|
|
@ -574,6 +574,49 @@ fn quota_would_be_exceeded_for_user_soft(
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn classify_me_d2c_flush_reason(
|
||||||
|
flush_immediately: bool,
|
||||||
|
batch_frames: usize,
|
||||||
|
max_frames: usize,
|
||||||
|
batch_bytes: usize,
|
||||||
|
max_bytes: usize,
|
||||||
|
max_delay_fired: bool,
|
||||||
|
) -> MeD2cFlushReason {
|
||||||
|
if flush_immediately {
|
||||||
|
return MeD2cFlushReason::AckImmediate;
|
||||||
|
}
|
||||||
|
if batch_frames >= max_frames {
|
||||||
|
return MeD2cFlushReason::BatchFrames;
|
||||||
|
}
|
||||||
|
if batch_bytes >= max_bytes {
|
||||||
|
return MeD2cFlushReason::BatchBytes;
|
||||||
|
}
|
||||||
|
if max_delay_fired {
|
||||||
|
return MeD2cFlushReason::MaxDelay;
|
||||||
|
}
|
||||||
|
MeD2cFlushReason::QueueDrain
|
||||||
|
}
|
||||||
|
|
||||||
|
fn observe_me_d2c_flush_event(
|
||||||
|
stats: &Stats,
|
||||||
|
reason: MeD2cFlushReason,
|
||||||
|
batch_frames: usize,
|
||||||
|
batch_bytes: usize,
|
||||||
|
flush_duration_us: Option<u64>,
|
||||||
|
) {
|
||||||
|
stats.increment_me_d2c_flush_reason(reason);
|
||||||
|
if batch_frames > 0 || batch_bytes > 0 {
|
||||||
|
stats.increment_me_d2c_batches_total();
|
||||||
|
stats.add_me_d2c_batch_frames_total(batch_frames as u64);
|
||||||
|
stats.add_me_d2c_batch_bytes_total(batch_bytes as u64);
|
||||||
|
stats.observe_me_d2c_batch_frames(batch_frames as u64);
|
||||||
|
stats.observe_me_d2c_batch_bytes(batch_bytes as u64);
|
||||||
|
}
|
||||||
|
if let Some(duration_us) = flush_duration_us {
|
||||||
|
stats.observe_me_d2c_flush_duration_us(duration_us);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
fn quota_user_lock_test_guard() -> &'static Mutex<()> {
|
fn quota_user_lock_test_guard() -> &'static Mutex<()> {
|
||||||
static TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
|
static TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
|
||||||
|
|
@ -810,6 +853,7 @@ where
|
||||||
let mut batch_frames = 0usize;
|
let mut batch_frames = 0usize;
|
||||||
let mut batch_bytes = 0usize;
|
let mut batch_bytes = 0usize;
|
||||||
let mut flush_immediately;
|
let mut flush_immediately;
|
||||||
|
let mut max_delay_fired = false;
|
||||||
|
|
||||||
let first_is_downstream_activity =
|
let first_is_downstream_activity =
|
||||||
matches!(&first, MeResponse::Data { .. } | MeResponse::Ack(_));
|
matches!(&first, MeResponse::Data { .. } | MeResponse::Ack(_));
|
||||||
|
|
@ -838,7 +882,25 @@ where
|
||||||
flush_immediately = immediate;
|
flush_immediately = immediate;
|
||||||
}
|
}
|
||||||
MeWriterResponseOutcome::Close => {
|
MeWriterResponseOutcome::Close => {
|
||||||
|
let flush_started_at = if stats_clone.telemetry_policy().me_level.allows_debug() {
|
||||||
|
Some(Instant::now())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
let _ = writer.flush().await;
|
let _ = writer.flush().await;
|
||||||
|
let flush_duration_us = flush_started_at.map(|started| {
|
||||||
|
started
|
||||||
|
.elapsed()
|
||||||
|
.as_micros()
|
||||||
|
.min(u128::from(u64::MAX)) as u64
|
||||||
|
});
|
||||||
|
observe_me_d2c_flush_event(
|
||||||
|
stats_clone.as_ref(),
|
||||||
|
MeD2cFlushReason::Close,
|
||||||
|
batch_frames,
|
||||||
|
batch_bytes,
|
||||||
|
flush_duration_us,
|
||||||
|
);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -878,7 +940,27 @@ where
|
||||||
flush_immediately |= immediate;
|
flush_immediately |= immediate;
|
||||||
}
|
}
|
||||||
MeWriterResponseOutcome::Close => {
|
MeWriterResponseOutcome::Close => {
|
||||||
|
let flush_started_at =
|
||||||
|
if stats_clone.telemetry_policy().me_level.allows_debug() {
|
||||||
|
Some(Instant::now())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
let _ = writer.flush().await;
|
let _ = writer.flush().await;
|
||||||
|
let flush_duration_us = flush_started_at.map(|started| {
|
||||||
|
started
|
||||||
|
.elapsed()
|
||||||
|
.as_micros()
|
||||||
|
.min(u128::from(u64::MAX))
|
||||||
|
as u64
|
||||||
|
});
|
||||||
|
observe_me_d2c_flush_event(
|
||||||
|
stats_clone.as_ref(),
|
||||||
|
MeD2cFlushReason::Close,
|
||||||
|
batch_frames,
|
||||||
|
batch_bytes,
|
||||||
|
flush_duration_us,
|
||||||
|
);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -889,6 +971,7 @@ where
|
||||||
&& batch_frames < d2c_flush_policy.max_frames
|
&& batch_frames < d2c_flush_policy.max_frames
|
||||||
&& batch_bytes < d2c_flush_policy.max_bytes
|
&& batch_bytes < d2c_flush_policy.max_bytes
|
||||||
{
|
{
|
||||||
|
stats_clone.increment_me_d2c_batch_timeout_armed_total();
|
||||||
match tokio::time::timeout(d2c_flush_policy.max_delay, me_rx_task.recv()).await {
|
match tokio::time::timeout(d2c_flush_policy.max_delay, me_rx_task.recv()).await {
|
||||||
Ok(Some(next)) => {
|
Ok(Some(next)) => {
|
||||||
let next_is_downstream_activity =
|
let next_is_downstream_activity =
|
||||||
|
|
@ -918,7 +1001,30 @@ where
|
||||||
flush_immediately |= immediate;
|
flush_immediately |= immediate;
|
||||||
}
|
}
|
||||||
MeWriterResponseOutcome::Close => {
|
MeWriterResponseOutcome::Close => {
|
||||||
|
let flush_started_at = if stats_clone
|
||||||
|
.telemetry_policy()
|
||||||
|
.me_level
|
||||||
|
.allows_debug()
|
||||||
|
{
|
||||||
|
Some(Instant::now())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
let _ = writer.flush().await;
|
let _ = writer.flush().await;
|
||||||
|
let flush_duration_us = flush_started_at.map(|started| {
|
||||||
|
started
|
||||||
|
.elapsed()
|
||||||
|
.as_micros()
|
||||||
|
.min(u128::from(u64::MAX))
|
||||||
|
as u64
|
||||||
|
});
|
||||||
|
observe_me_d2c_flush_event(
|
||||||
|
stats_clone.as_ref(),
|
||||||
|
MeD2cFlushReason::Close,
|
||||||
|
batch_frames,
|
||||||
|
batch_bytes,
|
||||||
|
flush_duration_us,
|
||||||
|
);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -958,7 +1064,30 @@ where
|
||||||
flush_immediately |= immediate;
|
flush_immediately |= immediate;
|
||||||
}
|
}
|
||||||
MeWriterResponseOutcome::Close => {
|
MeWriterResponseOutcome::Close => {
|
||||||
|
let flush_started_at = if stats_clone
|
||||||
|
.telemetry_policy()
|
||||||
|
.me_level
|
||||||
|
.allows_debug()
|
||||||
|
{
|
||||||
|
Some(Instant::now())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
let _ = writer.flush().await;
|
let _ = writer.flush().await;
|
||||||
|
let flush_duration_us = flush_started_at.map(|started| {
|
||||||
|
started
|
||||||
|
.elapsed()
|
||||||
|
.as_micros()
|
||||||
|
.min(u128::from(u64::MAX))
|
||||||
|
as u64
|
||||||
|
});
|
||||||
|
observe_me_d2c_flush_event(
|
||||||
|
stats_clone.as_ref(),
|
||||||
|
MeD2cFlushReason::Close,
|
||||||
|
batch_frames,
|
||||||
|
batch_bytes,
|
||||||
|
flush_duration_us,
|
||||||
|
);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -968,16 +1097,49 @@ where
|
||||||
debug!(conn_id, "ME channel closed");
|
debug!(conn_id, "ME channel closed");
|
||||||
return Err(ProxyError::Proxy("ME connection lost".into()));
|
return Err(ProxyError::Proxy("ME connection lost".into()));
|
||||||
}
|
}
|
||||||
Err(_) => {}
|
Err(_) => {
|
||||||
|
max_delay_fired = true;
|
||||||
|
stats_clone.increment_me_d2c_batch_timeout_fired_total();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let flush_reason = classify_me_d2c_flush_reason(
|
||||||
|
flush_immediately,
|
||||||
|
batch_frames,
|
||||||
|
d2c_flush_policy.max_frames,
|
||||||
|
batch_bytes,
|
||||||
|
d2c_flush_policy.max_bytes,
|
||||||
|
max_delay_fired,
|
||||||
|
);
|
||||||
|
let flush_started_at = if stats_clone.telemetry_policy().me_level.allows_debug() {
|
||||||
|
Some(Instant::now())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
writer.flush().await.map_err(ProxyError::Io)?;
|
writer.flush().await.map_err(ProxyError::Io)?;
|
||||||
|
let flush_duration_us = flush_started_at.map(|started| {
|
||||||
|
started
|
||||||
|
.elapsed()
|
||||||
|
.as_micros()
|
||||||
|
.min(u128::from(u64::MAX)) as u64
|
||||||
|
});
|
||||||
|
observe_me_d2c_flush_event(
|
||||||
|
stats_clone.as_ref(),
|
||||||
|
flush_reason,
|
||||||
|
batch_frames,
|
||||||
|
batch_bytes,
|
||||||
|
flush_duration_us,
|
||||||
|
);
|
||||||
let shrink_threshold = d2c_flush_policy.frame_buf_shrink_threshold_bytes;
|
let shrink_threshold = d2c_flush_policy.frame_buf_shrink_threshold_bytes;
|
||||||
let shrink_trigger = shrink_threshold
|
let shrink_trigger = shrink_threshold
|
||||||
.saturating_mul(ME_D2C_FRAME_BUF_SHRINK_HYSTERESIS_FACTOR);
|
.saturating_mul(ME_D2C_FRAME_BUF_SHRINK_HYSTERESIS_FACTOR);
|
||||||
if frame_buf.capacity() > shrink_trigger {
|
if frame_buf.capacity() > shrink_trigger {
|
||||||
|
let cap_before = frame_buf.capacity();
|
||||||
frame_buf.shrink_to(shrink_threshold);
|
frame_buf.shrink_to(shrink_threshold);
|
||||||
|
let cap_after = frame_buf.capacity();
|
||||||
|
let bytes_freed = cap_before.saturating_sub(cap_after) as u64;
|
||||||
|
stats_clone.observe_me_d2c_frame_buf_shrink(bytes_freed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ = &mut stop_rx => {
|
_ = &mut stop_rx => {
|
||||||
|
|
@ -1552,15 +1714,21 @@ where
|
||||||
data_len,
|
data_len,
|
||||||
quota_soft_overshoot_bytes,
|
quota_soft_overshoot_bytes,
|
||||||
) {
|
) {
|
||||||
|
stats.increment_me_d2c_quota_reject_total(MeD2cQuotaRejectStage::PreWrite);
|
||||||
return Err(ProxyError::DataQuotaExceeded {
|
return Err(ProxyError::DataQuotaExceeded {
|
||||||
user: user.to_string(),
|
user: user.to_string(),
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
write_client_payload(client_writer, proto_tag, flags, &data, rng, frame_buf).await?;
|
let write_mode =
|
||||||
|
write_client_payload(client_writer, proto_tag, flags, &data, rng, frame_buf)
|
||||||
|
.await?;
|
||||||
|
stats.increment_me_d2c_write_mode(write_mode);
|
||||||
|
|
||||||
bytes_me2c.fetch_add(data.len() as u64, Ordering::Relaxed);
|
bytes_me2c.fetch_add(data.len() as u64, Ordering::Relaxed);
|
||||||
stats.add_user_octets_to(user, data.len() as u64);
|
stats.add_user_octets_to(user, data.len() as u64);
|
||||||
|
stats.increment_me_d2c_data_frames_total();
|
||||||
|
stats.add_me_d2c_payload_bytes_total(data.len() as u64);
|
||||||
|
|
||||||
if quota_exceeded_for_user_soft(
|
if quota_exceeded_for_user_soft(
|
||||||
stats,
|
stats,
|
||||||
|
|
@ -1568,6 +1736,7 @@ where
|
||||||
quota_limit,
|
quota_limit,
|
||||||
quota_soft_overshoot_bytes,
|
quota_soft_overshoot_bytes,
|
||||||
) {
|
) {
|
||||||
|
stats.increment_me_d2c_quota_reject_total(MeD2cQuotaRejectStage::PostWrite);
|
||||||
return Err(ProxyError::DataQuotaExceeded {
|
return Err(ProxyError::DataQuotaExceeded {
|
||||||
user: user.to_string(),
|
user: user.to_string(),
|
||||||
});
|
});
|
||||||
|
|
@ -1586,6 +1755,7 @@ where
|
||||||
trace!(conn_id, confirm, "ME->C quickack");
|
trace!(conn_id, confirm, "ME->C quickack");
|
||||||
}
|
}
|
||||||
write_client_ack(client_writer, proto_tag, confirm).await?;
|
write_client_ack(client_writer, proto_tag, confirm).await?;
|
||||||
|
stats.increment_me_d2c_ack_frames_total();
|
||||||
|
|
||||||
Ok(MeWriterResponseOutcome::Continue {
|
Ok(MeWriterResponseOutcome::Continue {
|
||||||
frames: 1,
|
frames: 1,
|
||||||
|
|
@ -1636,13 +1806,13 @@ async fn write_client_payload<W>(
|
||||||
data: &[u8],
|
data: &[u8],
|
||||||
rng: &SecureRandom,
|
rng: &SecureRandom,
|
||||||
frame_buf: &mut Vec<u8>,
|
frame_buf: &mut Vec<u8>,
|
||||||
) -> Result<()>
|
) -> Result<MeD2cWriteMode>
|
||||||
where
|
where
|
||||||
W: AsyncWrite + Unpin + Send + 'static,
|
W: AsyncWrite + Unpin + Send + 'static,
|
||||||
{
|
{
|
||||||
let quickack = (flags & RPC_FLAG_QUICKACK) != 0;
|
let quickack = (flags & RPC_FLAG_QUICKACK) != 0;
|
||||||
|
|
||||||
match proto_tag {
|
let write_mode = match proto_tag {
|
||||||
ProtoTag::Abridged => {
|
ProtoTag::Abridged => {
|
||||||
if !data.len().is_multiple_of(4) {
|
if !data.len().is_multiple_of(4) {
|
||||||
return Err(ProxyError::Proxy(format!(
|
return Err(ProxyError::Proxy(format!(
|
||||||
|
|
@ -1667,10 +1837,12 @@ where
|
||||||
.write_all(frame_buf.as_slice())
|
.write_all(frame_buf.as_slice())
|
||||||
.await
|
.await
|
||||||
.map_err(ProxyError::Io)?;
|
.map_err(ProxyError::Io)?;
|
||||||
|
MeD2cWriteMode::Coalesced
|
||||||
} else {
|
} else {
|
||||||
let header = [first];
|
let header = [first];
|
||||||
client_writer.write_all(&header).await.map_err(ProxyError::Io)?;
|
client_writer.write_all(&header).await.map_err(ProxyError::Io)?;
|
||||||
client_writer.write_all(data).await.map_err(ProxyError::Io)?;
|
client_writer.write_all(data).await.map_err(ProxyError::Io)?;
|
||||||
|
MeD2cWriteMode::Split
|
||||||
}
|
}
|
||||||
} else if len_words < (1 << 24) {
|
} else if len_words < (1 << 24) {
|
||||||
let mut first = 0x7fu8;
|
let mut first = 0x7fu8;
|
||||||
|
|
@ -1688,10 +1860,12 @@ where
|
||||||
.write_all(frame_buf.as_slice())
|
.write_all(frame_buf.as_slice())
|
||||||
.await
|
.await
|
||||||
.map_err(ProxyError::Io)?;
|
.map_err(ProxyError::Io)?;
|
||||||
|
MeD2cWriteMode::Coalesced
|
||||||
} else {
|
} else {
|
||||||
let header = [first, lw[0], lw[1], lw[2]];
|
let header = [first, lw[0], lw[1], lw[2]];
|
||||||
client_writer.write_all(&header).await.map_err(ProxyError::Io)?;
|
client_writer.write_all(&header).await.map_err(ProxyError::Io)?;
|
||||||
client_writer.write_all(data).await.map_err(ProxyError::Io)?;
|
client_writer.write_all(data).await.map_err(ProxyError::Io)?;
|
||||||
|
MeD2cWriteMode::Split
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return Err(ProxyError::Proxy(format!(
|
return Err(ProxyError::Proxy(format!(
|
||||||
|
|
@ -1729,6 +1903,7 @@ where
|
||||||
.write_all(frame_buf.as_slice())
|
.write_all(frame_buf.as_slice())
|
||||||
.await
|
.await
|
||||||
.map_err(ProxyError::Io)?;
|
.map_err(ProxyError::Io)?;
|
||||||
|
MeD2cWriteMode::Coalesced
|
||||||
} else {
|
} else {
|
||||||
let header = len_val.to_le_bytes();
|
let header = len_val.to_le_bytes();
|
||||||
client_writer.write_all(&header).await.map_err(ProxyError::Io)?;
|
client_writer.write_all(&header).await.map_err(ProxyError::Io)?;
|
||||||
|
|
@ -1745,11 +1920,12 @@ where
|
||||||
.await
|
.await
|
||||||
.map_err(ProxyError::Io)?;
|
.map_err(ProxyError::Io)?;
|
||||||
}
|
}
|
||||||
|
MeD2cWriteMode::Split
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
};
|
||||||
|
|
||||||
Ok(())
|
Ok(write_mode)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn write_client_ack<W>(
|
async fn write_client_ack<W>(
|
||||||
|
|
|
||||||
479
src/stats/mod.rs
479
src/stats/mod.rs
|
|
@ -26,6 +26,28 @@ enum RouteConnectionGauge {
|
||||||
Middle,
|
Middle,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy)]
|
||||||
|
pub enum MeD2cFlushReason {
|
||||||
|
QueueDrain,
|
||||||
|
BatchFrames,
|
||||||
|
BatchBytes,
|
||||||
|
MaxDelay,
|
||||||
|
AckImmediate,
|
||||||
|
Close,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy)]
|
||||||
|
pub enum MeD2cWriteMode {
|
||||||
|
Coalesced,
|
||||||
|
Split,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy)]
|
||||||
|
pub enum MeD2cQuotaRejectStage {
|
||||||
|
PreWrite,
|
||||||
|
PostWrite,
|
||||||
|
}
|
||||||
|
|
||||||
#[must_use = "RouteConnectionLease must be kept alive to hold the connection gauge increment"]
|
#[must_use = "RouteConnectionLease must be kept alive to hold the connection gauge increment"]
|
||||||
pub struct RouteConnectionLease {
|
pub struct RouteConnectionLease {
|
||||||
stats: Arc<Stats>,
|
stats: Arc<Stats>,
|
||||||
|
|
@ -140,6 +162,44 @@ pub struct Stats {
|
||||||
me_route_drop_queue_full: AtomicU64,
|
me_route_drop_queue_full: AtomicU64,
|
||||||
me_route_drop_queue_full_base: AtomicU64,
|
me_route_drop_queue_full_base: AtomicU64,
|
||||||
me_route_drop_queue_full_high: AtomicU64,
|
me_route_drop_queue_full_high: AtomicU64,
|
||||||
|
me_d2c_batches_total: AtomicU64,
|
||||||
|
me_d2c_batch_frames_total: AtomicU64,
|
||||||
|
me_d2c_batch_bytes_total: AtomicU64,
|
||||||
|
me_d2c_flush_reason_queue_drain_total: AtomicU64,
|
||||||
|
me_d2c_flush_reason_batch_frames_total: AtomicU64,
|
||||||
|
me_d2c_flush_reason_batch_bytes_total: AtomicU64,
|
||||||
|
me_d2c_flush_reason_max_delay_total: AtomicU64,
|
||||||
|
me_d2c_flush_reason_ack_immediate_total: AtomicU64,
|
||||||
|
me_d2c_flush_reason_close_total: AtomicU64,
|
||||||
|
me_d2c_data_frames_total: AtomicU64,
|
||||||
|
me_d2c_ack_frames_total: AtomicU64,
|
||||||
|
me_d2c_payload_bytes_total: AtomicU64,
|
||||||
|
me_d2c_write_mode_coalesced_total: AtomicU64,
|
||||||
|
me_d2c_write_mode_split_total: AtomicU64,
|
||||||
|
me_d2c_quota_reject_pre_write_total: AtomicU64,
|
||||||
|
me_d2c_quota_reject_post_write_total: AtomicU64,
|
||||||
|
me_d2c_frame_buf_shrink_total: AtomicU64,
|
||||||
|
me_d2c_frame_buf_shrink_bytes_total: AtomicU64,
|
||||||
|
me_d2c_batch_frames_bucket_1: AtomicU64,
|
||||||
|
me_d2c_batch_frames_bucket_2_4: AtomicU64,
|
||||||
|
me_d2c_batch_frames_bucket_5_8: AtomicU64,
|
||||||
|
me_d2c_batch_frames_bucket_9_16: AtomicU64,
|
||||||
|
me_d2c_batch_frames_bucket_17_32: AtomicU64,
|
||||||
|
me_d2c_batch_frames_bucket_gt_32: AtomicU64,
|
||||||
|
me_d2c_batch_bytes_bucket_0_1k: AtomicU64,
|
||||||
|
me_d2c_batch_bytes_bucket_1k_4k: AtomicU64,
|
||||||
|
me_d2c_batch_bytes_bucket_4k_16k: AtomicU64,
|
||||||
|
me_d2c_batch_bytes_bucket_16k_64k: AtomicU64,
|
||||||
|
me_d2c_batch_bytes_bucket_64k_128k: AtomicU64,
|
||||||
|
me_d2c_batch_bytes_bucket_gt_128k: AtomicU64,
|
||||||
|
me_d2c_flush_duration_us_bucket_0_50: AtomicU64,
|
||||||
|
me_d2c_flush_duration_us_bucket_51_200: AtomicU64,
|
||||||
|
me_d2c_flush_duration_us_bucket_201_1000: AtomicU64,
|
||||||
|
me_d2c_flush_duration_us_bucket_1001_5000: AtomicU64,
|
||||||
|
me_d2c_flush_duration_us_bucket_5001_20000: AtomicU64,
|
||||||
|
me_d2c_flush_duration_us_bucket_gt_20000: AtomicU64,
|
||||||
|
me_d2c_batch_timeout_armed_total: AtomicU64,
|
||||||
|
me_d2c_batch_timeout_fired_total: AtomicU64,
|
||||||
me_writer_pick_sorted_rr_success_try_total: AtomicU64,
|
me_writer_pick_sorted_rr_success_try_total: AtomicU64,
|
||||||
me_writer_pick_sorted_rr_success_fallback_total: AtomicU64,
|
me_writer_pick_sorted_rr_success_fallback_total: AtomicU64,
|
||||||
me_writer_pick_sorted_rr_full_total: AtomicU64,
|
me_writer_pick_sorted_rr_full_total: AtomicU64,
|
||||||
|
|
@ -594,6 +654,215 @@ impl Stats {
|
||||||
.fetch_add(1, Ordering::Relaxed);
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
pub fn increment_me_d2c_batches_total(&self) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_d2c_batches_total.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn add_me_d2c_batch_frames_total(&self, frames: u64) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_d2c_batch_frames_total
|
||||||
|
.fetch_add(frames, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn add_me_d2c_batch_bytes_total(&self, bytes: u64) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_d2c_batch_bytes_total
|
||||||
|
.fetch_add(bytes, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn increment_me_d2c_flush_reason(&self, reason: MeD2cFlushReason) {
|
||||||
|
if !self.telemetry_me_allows_normal() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
match reason {
|
||||||
|
MeD2cFlushReason::QueueDrain => {
|
||||||
|
self.me_d2c_flush_reason_queue_drain_total
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
MeD2cFlushReason::BatchFrames => {
|
||||||
|
self.me_d2c_flush_reason_batch_frames_total
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
MeD2cFlushReason::BatchBytes => {
|
||||||
|
self.me_d2c_flush_reason_batch_bytes_total
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
MeD2cFlushReason::MaxDelay => {
|
||||||
|
self.me_d2c_flush_reason_max_delay_total
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
MeD2cFlushReason::AckImmediate => {
|
||||||
|
self.me_d2c_flush_reason_ack_immediate_total
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
MeD2cFlushReason::Close => {
|
||||||
|
self.me_d2c_flush_reason_close_total
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn increment_me_d2c_data_frames_total(&self) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_d2c_data_frames_total.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn increment_me_d2c_ack_frames_total(&self) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_d2c_ack_frames_total.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn add_me_d2c_payload_bytes_total(&self, bytes: u64) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_d2c_payload_bytes_total
|
||||||
|
.fetch_add(bytes, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn increment_me_d2c_write_mode(&self, mode: MeD2cWriteMode) {
|
||||||
|
if !self.telemetry_me_allows_normal() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
match mode {
|
||||||
|
MeD2cWriteMode::Coalesced => {
|
||||||
|
self.me_d2c_write_mode_coalesced_total
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
MeD2cWriteMode::Split => {
|
||||||
|
self.me_d2c_write_mode_split_total
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn increment_me_d2c_quota_reject_total(&self, stage: MeD2cQuotaRejectStage) {
|
||||||
|
if !self.telemetry_me_allows_normal() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
match stage {
|
||||||
|
MeD2cQuotaRejectStage::PreWrite => {
|
||||||
|
self.me_d2c_quota_reject_pre_write_total
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
MeD2cQuotaRejectStage::PostWrite => {
|
||||||
|
self.me_d2c_quota_reject_post_write_total
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn observe_me_d2c_frame_buf_shrink(&self, bytes_freed: u64) {
|
||||||
|
if !self.telemetry_me_allows_normal() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
self.me_d2c_frame_buf_shrink_total
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
self.me_d2c_frame_buf_shrink_bytes_total
|
||||||
|
.fetch_add(bytes_freed, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
pub fn observe_me_d2c_batch_frames(&self, frames: u64) {
|
||||||
|
if !self.telemetry_me_allows_debug() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
match frames {
|
||||||
|
0 => {}
|
||||||
|
1 => {
|
||||||
|
self.me_d2c_batch_frames_bucket_1
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
2..=4 => {
|
||||||
|
self.me_d2c_batch_frames_bucket_2_4
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
5..=8 => {
|
||||||
|
self.me_d2c_batch_frames_bucket_5_8
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
9..=16 => {
|
||||||
|
self.me_d2c_batch_frames_bucket_9_16
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
17..=32 => {
|
||||||
|
self.me_d2c_batch_frames_bucket_17_32
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
self.me_d2c_batch_frames_bucket_gt_32
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn observe_me_d2c_batch_bytes(&self, bytes: u64) {
|
||||||
|
if !self.telemetry_me_allows_debug() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
match bytes {
|
||||||
|
0..=1024 => {
|
||||||
|
self.me_d2c_batch_bytes_bucket_0_1k
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
1025..=4096 => {
|
||||||
|
self.me_d2c_batch_bytes_bucket_1k_4k
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
4097..=16_384 => {
|
||||||
|
self.me_d2c_batch_bytes_bucket_4k_16k
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
16_385..=65_536 => {
|
||||||
|
self.me_d2c_batch_bytes_bucket_16k_64k
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
65_537..=131_072 => {
|
||||||
|
self.me_d2c_batch_bytes_bucket_64k_128k
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
self.me_d2c_batch_bytes_bucket_gt_128k
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn observe_me_d2c_flush_duration_us(&self, duration_us: u64) {
|
||||||
|
if !self.telemetry_me_allows_debug() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
match duration_us {
|
||||||
|
0..=50 => {
|
||||||
|
self.me_d2c_flush_duration_us_bucket_0_50
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
51..=200 => {
|
||||||
|
self.me_d2c_flush_duration_us_bucket_51_200
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
201..=1000 => {
|
||||||
|
self.me_d2c_flush_duration_us_bucket_201_1000
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
1001..=5000 => {
|
||||||
|
self.me_d2c_flush_duration_us_bucket_1001_5000
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
5001..=20_000 => {
|
||||||
|
self.me_d2c_flush_duration_us_bucket_5001_20000
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
self.me_d2c_flush_duration_us_bucket_gt_20000
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn increment_me_d2c_batch_timeout_armed_total(&self) {
|
||||||
|
if self.telemetry_me_allows_debug() {
|
||||||
|
self.me_d2c_batch_timeout_armed_total
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn increment_me_d2c_batch_timeout_fired_total(&self) {
|
||||||
|
if self.telemetry_me_allows_debug() {
|
||||||
|
self.me_d2c_batch_timeout_fired_total
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
pub fn increment_me_writer_pick_success_try_total(&self, mode: MeWriterPickMode) {
|
pub fn increment_me_writer_pick_success_try_total(&self, mode: MeWriterPickMode) {
|
||||||
if !self.telemetry_me_allows_normal() {
|
if !self.telemetry_me_allows_normal() {
|
||||||
return;
|
return;
|
||||||
|
|
@ -1229,6 +1498,142 @@ impl Stats {
|
||||||
pub fn get_me_route_drop_queue_full_high(&self) -> u64 {
|
pub fn get_me_route_drop_queue_full_high(&self) -> u64 {
|
||||||
self.me_route_drop_queue_full_high.load(Ordering::Relaxed)
|
self.me_route_drop_queue_full_high.load(Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
pub fn get_me_d2c_batches_total(&self) -> u64 {
|
||||||
|
self.me_d2c_batches_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_batch_frames_total(&self) -> u64 {
|
||||||
|
self.me_d2c_batch_frames_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_batch_bytes_total(&self) -> u64 {
|
||||||
|
self.me_d2c_batch_bytes_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_flush_reason_queue_drain_total(&self) -> u64 {
|
||||||
|
self.me_d2c_flush_reason_queue_drain_total
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_flush_reason_batch_frames_total(&self) -> u64 {
|
||||||
|
self.me_d2c_flush_reason_batch_frames_total
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_flush_reason_batch_bytes_total(&self) -> u64 {
|
||||||
|
self.me_d2c_flush_reason_batch_bytes_total
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_flush_reason_max_delay_total(&self) -> u64 {
|
||||||
|
self.me_d2c_flush_reason_max_delay_total
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_flush_reason_ack_immediate_total(&self) -> u64 {
|
||||||
|
self.me_d2c_flush_reason_ack_immediate_total
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_flush_reason_close_total(&self) -> u64 {
|
||||||
|
self.me_d2c_flush_reason_close_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_data_frames_total(&self) -> u64 {
|
||||||
|
self.me_d2c_data_frames_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_ack_frames_total(&self) -> u64 {
|
||||||
|
self.me_d2c_ack_frames_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_payload_bytes_total(&self) -> u64 {
|
||||||
|
self.me_d2c_payload_bytes_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_write_mode_coalesced_total(&self) -> u64 {
|
||||||
|
self.me_d2c_write_mode_coalesced_total
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_write_mode_split_total(&self) -> u64 {
|
||||||
|
self.me_d2c_write_mode_split_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_quota_reject_pre_write_total(&self) -> u64 {
|
||||||
|
self.me_d2c_quota_reject_pre_write_total
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_quota_reject_post_write_total(&self) -> u64 {
|
||||||
|
self.me_d2c_quota_reject_post_write_total
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_frame_buf_shrink_total(&self) -> u64 {
|
||||||
|
self.me_d2c_frame_buf_shrink_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_frame_buf_shrink_bytes_total(&self) -> u64 {
|
||||||
|
self.me_d2c_frame_buf_shrink_bytes_total
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_batch_frames_bucket_1(&self) -> u64 {
|
||||||
|
self.me_d2c_batch_frames_bucket_1.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_batch_frames_bucket_2_4(&self) -> u64 {
|
||||||
|
self.me_d2c_batch_frames_bucket_2_4.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_batch_frames_bucket_5_8(&self) -> u64 {
|
||||||
|
self.me_d2c_batch_frames_bucket_5_8.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_batch_frames_bucket_9_16(&self) -> u64 {
|
||||||
|
self.me_d2c_batch_frames_bucket_9_16.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_batch_frames_bucket_17_32(&self) -> u64 {
|
||||||
|
self.me_d2c_batch_frames_bucket_17_32
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_batch_frames_bucket_gt_32(&self) -> u64 {
|
||||||
|
self.me_d2c_batch_frames_bucket_gt_32
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_batch_bytes_bucket_0_1k(&self) -> u64 {
|
||||||
|
self.me_d2c_batch_bytes_bucket_0_1k.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_batch_bytes_bucket_1k_4k(&self) -> u64 {
|
||||||
|
self.me_d2c_batch_bytes_bucket_1k_4k.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_batch_bytes_bucket_4k_16k(&self) -> u64 {
|
||||||
|
self.me_d2c_batch_bytes_bucket_4k_16k.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_batch_bytes_bucket_16k_64k(&self) -> u64 {
|
||||||
|
self.me_d2c_batch_bytes_bucket_16k_64k
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_batch_bytes_bucket_64k_128k(&self) -> u64 {
|
||||||
|
self.me_d2c_batch_bytes_bucket_64k_128k
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_batch_bytes_bucket_gt_128k(&self) -> u64 {
|
||||||
|
self.me_d2c_batch_bytes_bucket_gt_128k
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_flush_duration_us_bucket_0_50(&self) -> u64 {
|
||||||
|
self.me_d2c_flush_duration_us_bucket_0_50
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_flush_duration_us_bucket_51_200(&self) -> u64 {
|
||||||
|
self.me_d2c_flush_duration_us_bucket_51_200
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_flush_duration_us_bucket_201_1000(&self) -> u64 {
|
||||||
|
self.me_d2c_flush_duration_us_bucket_201_1000
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_flush_duration_us_bucket_1001_5000(&self) -> u64 {
|
||||||
|
self.me_d2c_flush_duration_us_bucket_1001_5000
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_flush_duration_us_bucket_5001_20000(&self) -> u64 {
|
||||||
|
self.me_d2c_flush_duration_us_bucket_5001_20000
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_flush_duration_us_bucket_gt_20000(&self) -> u64 {
|
||||||
|
self.me_d2c_flush_duration_us_bucket_gt_20000
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_batch_timeout_armed_total(&self) -> u64 {
|
||||||
|
self.me_d2c_batch_timeout_armed_total
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_d2c_batch_timeout_fired_total(&self) -> u64 {
|
||||||
|
self.me_d2c_batch_timeout_fired_total
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
pub fn get_me_writer_pick_sorted_rr_success_try_total(&self) -> u64 {
|
pub fn get_me_writer_pick_sorted_rr_success_try_total(&self) -> u64 {
|
||||||
self.me_writer_pick_sorted_rr_success_try_total
|
self.me_writer_pick_sorted_rr_success_try_total
|
||||||
.load(Ordering::Relaxed)
|
.load(Ordering::Relaxed)
|
||||||
|
|
@ -1898,9 +2303,83 @@ mod tests {
|
||||||
stats.increment_me_crc_mismatch();
|
stats.increment_me_crc_mismatch();
|
||||||
stats.increment_me_keepalive_sent();
|
stats.increment_me_keepalive_sent();
|
||||||
stats.increment_me_route_drop_queue_full();
|
stats.increment_me_route_drop_queue_full();
|
||||||
|
stats.increment_me_d2c_batches_total();
|
||||||
|
stats.add_me_d2c_batch_frames_total(4);
|
||||||
|
stats.add_me_d2c_batch_bytes_total(4096);
|
||||||
|
stats.increment_me_d2c_flush_reason(MeD2cFlushReason::BatchBytes);
|
||||||
|
stats.increment_me_d2c_write_mode(MeD2cWriteMode::Coalesced);
|
||||||
|
stats.increment_me_d2c_quota_reject_total(MeD2cQuotaRejectStage::PreWrite);
|
||||||
|
stats.observe_me_d2c_frame_buf_shrink(1024);
|
||||||
|
stats.observe_me_d2c_batch_frames(4);
|
||||||
|
stats.observe_me_d2c_batch_bytes(4096);
|
||||||
|
stats.observe_me_d2c_flush_duration_us(120);
|
||||||
|
stats.increment_me_d2c_batch_timeout_armed_total();
|
||||||
|
stats.increment_me_d2c_batch_timeout_fired_total();
|
||||||
assert_eq!(stats.get_me_crc_mismatch(), 0);
|
assert_eq!(stats.get_me_crc_mismatch(), 0);
|
||||||
assert_eq!(stats.get_me_keepalive_sent(), 0);
|
assert_eq!(stats.get_me_keepalive_sent(), 0);
|
||||||
assert_eq!(stats.get_me_route_drop_queue_full(), 0);
|
assert_eq!(stats.get_me_route_drop_queue_full(), 0);
|
||||||
|
assert_eq!(stats.get_me_d2c_batches_total(), 0);
|
||||||
|
assert_eq!(stats.get_me_d2c_flush_reason_batch_bytes_total(), 0);
|
||||||
|
assert_eq!(stats.get_me_d2c_write_mode_coalesced_total(), 0);
|
||||||
|
assert_eq!(stats.get_me_d2c_quota_reject_pre_write_total(), 0);
|
||||||
|
assert_eq!(stats.get_me_d2c_frame_buf_shrink_total(), 0);
|
||||||
|
assert_eq!(stats.get_me_d2c_batch_frames_bucket_2_4(), 0);
|
||||||
|
assert_eq!(stats.get_me_d2c_batch_bytes_bucket_1k_4k(), 0);
|
||||||
|
assert_eq!(stats.get_me_d2c_flush_duration_us_bucket_51_200(), 0);
|
||||||
|
assert_eq!(stats.get_me_d2c_batch_timeout_armed_total(), 0);
|
||||||
|
assert_eq!(stats.get_me_d2c_batch_timeout_fired_total(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_telemetry_policy_me_normal_blocks_d2c_debug_metrics() {
|
||||||
|
let stats = Stats::new();
|
||||||
|
stats.apply_telemetry_policy(TelemetryPolicy {
|
||||||
|
core_enabled: true,
|
||||||
|
user_enabled: true,
|
||||||
|
me_level: MeTelemetryLevel::Normal,
|
||||||
|
});
|
||||||
|
|
||||||
|
stats.increment_me_d2c_batches_total();
|
||||||
|
stats.add_me_d2c_batch_frames_total(2);
|
||||||
|
stats.add_me_d2c_batch_bytes_total(2048);
|
||||||
|
stats.increment_me_d2c_flush_reason(MeD2cFlushReason::QueueDrain);
|
||||||
|
stats.observe_me_d2c_batch_frames(2);
|
||||||
|
stats.observe_me_d2c_batch_bytes(2048);
|
||||||
|
stats.observe_me_d2c_flush_duration_us(100);
|
||||||
|
stats.increment_me_d2c_batch_timeout_armed_total();
|
||||||
|
stats.increment_me_d2c_batch_timeout_fired_total();
|
||||||
|
|
||||||
|
assert_eq!(stats.get_me_d2c_batches_total(), 1);
|
||||||
|
assert_eq!(stats.get_me_d2c_batch_frames_total(), 2);
|
||||||
|
assert_eq!(stats.get_me_d2c_batch_bytes_total(), 2048);
|
||||||
|
assert_eq!(stats.get_me_d2c_flush_reason_queue_drain_total(), 1);
|
||||||
|
assert_eq!(stats.get_me_d2c_batch_frames_bucket_2_4(), 0);
|
||||||
|
assert_eq!(stats.get_me_d2c_batch_bytes_bucket_1k_4k(), 0);
|
||||||
|
assert_eq!(stats.get_me_d2c_flush_duration_us_bucket_51_200(), 0);
|
||||||
|
assert_eq!(stats.get_me_d2c_batch_timeout_armed_total(), 0);
|
||||||
|
assert_eq!(stats.get_me_d2c_batch_timeout_fired_total(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_telemetry_policy_me_debug_enables_d2c_debug_metrics() {
|
||||||
|
let stats = Stats::new();
|
||||||
|
stats.apply_telemetry_policy(TelemetryPolicy {
|
||||||
|
core_enabled: true,
|
||||||
|
user_enabled: true,
|
||||||
|
me_level: MeTelemetryLevel::Debug,
|
||||||
|
});
|
||||||
|
|
||||||
|
stats.observe_me_d2c_batch_frames(7);
|
||||||
|
stats.observe_me_d2c_batch_bytes(70_000);
|
||||||
|
stats.observe_me_d2c_flush_duration_us(1400);
|
||||||
|
stats.increment_me_d2c_batch_timeout_armed_total();
|
||||||
|
stats.increment_me_d2c_batch_timeout_fired_total();
|
||||||
|
|
||||||
|
assert_eq!(stats.get_me_d2c_batch_frames_bucket_5_8(), 1);
|
||||||
|
assert_eq!(stats.get_me_d2c_batch_bytes_bucket_64k_128k(), 1);
|
||||||
|
assert_eq!(stats.get_me_d2c_flush_duration_us_bucket_1001_5000(), 1);
|
||||||
|
assert_eq!(stats.get_me_d2c_batch_timeout_armed_total(), 1);
|
||||||
|
assert_eq!(stats.get_me_d2c_batch_timeout_fired_total(), 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue