From 0461bc65c60d1a77c252e2d5f555157b58a224d1 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Sun, 22 Mar 2026 15:00:15 +0300 Subject: [PATCH 1/2] DC -> Client Optimizations --- src/config/defaults.rs | 10 + src/config/hot_reload.rs | 14 +- src/config/load.rs | 13 ++ src/config/types.rs | 12 +- src/proxy/middle_relay.rs | 181 +++++++++++++----- .../tests/middle_relay_security_tests.rs | 8 + src/transport/middle_proxy/reader.rs | 12 +- 7 files changed, 190 insertions(+), 60 deletions(-) diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 650d70d..66ffeda 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -29,6 +29,8 @@ const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_FRAMES: usize = 32; const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_BYTES: usize = 128 * 1024; const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_DELAY_US: u64 = 500; const DEFAULT_ME_D2C_ACK_FLUSH_IMMEDIATE: bool = true; +const DEFAULT_ME_QUOTA_SOFT_OVERSHOOT_BYTES: u64 = 64 * 1024; +const DEFAULT_ME_D2C_FRAME_BUF_SHRINK_THRESHOLD_BYTES: usize = 256 * 1024; const DEFAULT_DIRECT_RELAY_COPY_BUF_C2S_BYTES: usize = 64 * 1024; const DEFAULT_DIRECT_RELAY_COPY_BUF_S2C_BYTES: usize = 256 * 1024; const DEFAULT_ME_WRITER_PICK_SAMPLE_SIZE: u8 = 3; @@ -387,6 +389,14 @@ pub(crate) fn default_me_d2c_ack_flush_immediate() -> bool { DEFAULT_ME_D2C_ACK_FLUSH_IMMEDIATE } +pub(crate) fn default_me_quota_soft_overshoot_bytes() -> u64 { + DEFAULT_ME_QUOTA_SOFT_OVERSHOOT_BYTES +} + +pub(crate) fn default_me_d2c_frame_buf_shrink_threshold_bytes() -> usize { + DEFAULT_ME_D2C_FRAME_BUF_SHRINK_THRESHOLD_BYTES +} + pub(crate) fn default_direct_relay_copy_buf_c2s_bytes() -> usize { DEFAULT_DIRECT_RELAY_COPY_BUF_C2S_BYTES } diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index 39c31a1..e580b7f 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -106,6 +106,8 @@ pub struct HotFields { pub me_d2c_flush_batch_max_bytes: usize, pub me_d2c_flush_batch_max_delay_us: u64, pub me_d2c_ack_flush_immediate: bool, + pub me_quota_soft_overshoot_bytes: u64, + pub me_d2c_frame_buf_shrink_threshold_bytes: usize, pub direct_relay_copy_buf_c2s_bytes: usize, pub direct_relay_copy_buf_s2c_bytes: usize, pub me_health_interval_ms_unhealthy: u64, @@ -225,6 +227,8 @@ impl HotFields { me_d2c_flush_batch_max_bytes: cfg.general.me_d2c_flush_batch_max_bytes, me_d2c_flush_batch_max_delay_us: cfg.general.me_d2c_flush_batch_max_delay_us, me_d2c_ack_flush_immediate: cfg.general.me_d2c_ack_flush_immediate, + me_quota_soft_overshoot_bytes: cfg.general.me_quota_soft_overshoot_bytes, + me_d2c_frame_buf_shrink_threshold_bytes: cfg.general.me_d2c_frame_buf_shrink_threshold_bytes, direct_relay_copy_buf_c2s_bytes: cfg.general.direct_relay_copy_buf_c2s_bytes, direct_relay_copy_buf_s2c_bytes: cfg.general.direct_relay_copy_buf_s2c_bytes, me_health_interval_ms_unhealthy: cfg.general.me_health_interval_ms_unhealthy, @@ -511,6 +515,9 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig { cfg.general.me_d2c_flush_batch_max_bytes = new.general.me_d2c_flush_batch_max_bytes; cfg.general.me_d2c_flush_batch_max_delay_us = new.general.me_d2c_flush_batch_max_delay_us; cfg.general.me_d2c_ack_flush_immediate = new.general.me_d2c_ack_flush_immediate; + cfg.general.me_quota_soft_overshoot_bytes = new.general.me_quota_soft_overshoot_bytes; + cfg.general.me_d2c_frame_buf_shrink_threshold_bytes = + new.general.me_d2c_frame_buf_shrink_threshold_bytes; cfg.general.direct_relay_copy_buf_c2s_bytes = new.general.direct_relay_copy_buf_c2s_bytes; cfg.general.direct_relay_copy_buf_s2c_bytes = new.general.direct_relay_copy_buf_s2c_bytes; cfg.general.me_health_interval_ms_unhealthy = new.general.me_health_interval_ms_unhealthy; @@ -1030,15 +1037,20 @@ fn log_changes( || old_hot.me_d2c_flush_batch_max_bytes != new_hot.me_d2c_flush_batch_max_bytes || old_hot.me_d2c_flush_batch_max_delay_us != new_hot.me_d2c_flush_batch_max_delay_us || old_hot.me_d2c_ack_flush_immediate != new_hot.me_d2c_ack_flush_immediate + || old_hot.me_quota_soft_overshoot_bytes != new_hot.me_quota_soft_overshoot_bytes + || old_hot.me_d2c_frame_buf_shrink_threshold_bytes + != new_hot.me_d2c_frame_buf_shrink_threshold_bytes || old_hot.direct_relay_copy_buf_c2s_bytes != new_hot.direct_relay_copy_buf_c2s_bytes || old_hot.direct_relay_copy_buf_s2c_bytes != new_hot.direct_relay_copy_buf_s2c_bytes { info!( - "config reload: relay_tuning: me_d2c_frames={} me_d2c_bytes={} me_d2c_delay_us={} me_ack_flush_immediate={} direct_buf_c2s={} direct_buf_s2c={}", + "config reload: relay_tuning: me_d2c_frames={} me_d2c_bytes={} me_d2c_delay_us={} me_ack_flush_immediate={} me_quota_soft_overshoot_bytes={} me_d2c_frame_buf_shrink_threshold_bytes={} direct_buf_c2s={} direct_buf_s2c={}", new_hot.me_d2c_flush_batch_max_frames, new_hot.me_d2c_flush_batch_max_bytes, new_hot.me_d2c_flush_batch_max_delay_us, new_hot.me_d2c_ack_flush_immediate, + new_hot.me_quota_soft_overshoot_bytes, + new_hot.me_d2c_frame_buf_shrink_threshold_bytes, new_hot.direct_relay_copy_buf_c2s_bytes, new_hot.direct_relay_copy_buf_s2c_bytes, ); diff --git a/src/config/load.rs b/src/config/load.rs index 2382878..bf6d036 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -533,6 +533,19 @@ impl ProxyConfig { )); } + if config.general.me_quota_soft_overshoot_bytes > 16 * 1024 * 1024 { + return Err(ProxyError::Config( + "general.me_quota_soft_overshoot_bytes must be within [0, 16777216]".to_string(), + )); + } + + if !(4096..=16 * 1024 * 1024).contains(&config.general.me_d2c_frame_buf_shrink_threshold_bytes) { + return Err(ProxyError::Config( + "general.me_d2c_frame_buf_shrink_threshold_bytes must be within [4096, 16777216]" + .to_string(), + )); + } + if !(4096..=1024 * 1024).contains(&config.general.direct_relay_copy_buf_c2s_bytes) { return Err(ProxyError::Config( "general.direct_relay_copy_buf_c2s_bytes must be within [4096, 1048576]" diff --git a/src/config/types.rs b/src/config/types.rs index 1c5423e..aa58dc1 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -468,7 +468,7 @@ pub struct GeneralConfig { pub me_c2me_send_timeout_ms: u64, /// Bounded wait in milliseconds for routing ME DATA to per-connection queue. - /// `0` keeps legacy no-wait behavior. + /// `0` keeps non-blocking routing; values >0 enable bounded wait for compatibility. #[serde(default = "default_me_reader_route_data_wait_ms")] pub me_reader_route_data_wait_ms: u64, @@ -489,6 +489,14 @@ pub struct GeneralConfig { #[serde(default = "default_me_d2c_ack_flush_immediate")] pub me_d2c_ack_flush_immediate: bool, + /// Additional bytes above strict per-user quota allowed in hot-path soft mode. + #[serde(default = "default_me_quota_soft_overshoot_bytes")] + pub me_quota_soft_overshoot_bytes: u64, + + /// Shrink threshold for reusable ME->Client frame assembly buffer. + #[serde(default = "default_me_d2c_frame_buf_shrink_threshold_bytes")] + pub me_d2c_frame_buf_shrink_threshold_bytes: usize, + /// Copy buffer size for client->DC direction in direct relay. #[serde(default = "default_direct_relay_copy_buf_c2s_bytes")] pub direct_relay_copy_buf_c2s_bytes: usize, @@ -945,6 +953,8 @@ impl Default for GeneralConfig { me_d2c_flush_batch_max_bytes: default_me_d2c_flush_batch_max_bytes(), me_d2c_flush_batch_max_delay_us: default_me_d2c_flush_batch_max_delay_us(), me_d2c_ack_flush_immediate: default_me_d2c_ack_flush_immediate(), + me_quota_soft_overshoot_bytes: default_me_quota_soft_overshoot_bytes(), + me_d2c_frame_buf_shrink_threshold_bytes: default_me_d2c_frame_buf_shrink_threshold_bytes(), direct_relay_copy_buf_c2s_bytes: default_direct_relay_copy_buf_c2s_bytes(), direct_relay_copy_buf_s2c_bytes: default_direct_relay_copy_buf_s2c_bytes(), me_warmup_stagger_enabled: default_true(), diff --git a/src/proxy/middle_relay.rs b/src/proxy/middle_relay.rs index f56a606..2c9fe0c 100644 --- a/src/proxy/middle_relay.rs +++ b/src/proxy/middle_relay.rs @@ -45,6 +45,8 @@ const C2ME_SEND_TIMEOUT: Duration = Duration::from_millis(50); const C2ME_SEND_TIMEOUT: Duration = Duration::from_secs(5); const ME_D2C_FLUSH_BATCH_MAX_FRAMES_MIN: usize = 1; const ME_D2C_FLUSH_BATCH_MAX_BYTES_MIN: usize = 4096; +const ME_D2C_FRAME_BUF_SHRINK_HYSTERESIS_FACTOR: usize = 2; +const ME_D2C_SINGLE_WRITE_COALESCE_MAX_BYTES: usize = 128 * 1024; #[cfg(test)] const QUOTA_USER_LOCKS_MAX: usize = 64; #[cfg(not(test))] @@ -214,6 +216,8 @@ struct MeD2cFlushPolicy { max_bytes: usize, max_delay: Duration, ack_flush_immediate: bool, + quota_soft_overshoot_bytes: u64, + frame_buf_shrink_threshold_bytes: usize, } #[derive(Clone, Copy)] @@ -284,6 +288,11 @@ impl MeD2cFlushPolicy { .max(ME_D2C_FLUSH_BATCH_MAX_BYTES_MIN), max_delay: Duration::from_micros(config.general.me_d2c_flush_batch_max_delay_us), ack_flush_immediate: config.general.me_d2c_ack_flush_immediate, + quota_soft_overshoot_bytes: config.general.me_quota_soft_overshoot_bytes, + frame_buf_shrink_threshold_bytes: config + .general + .me_d2c_frame_buf_shrink_threshold_bytes + .max(4096), } } } @@ -538,6 +547,33 @@ fn quota_would_be_exceeded_for_user( }) } +fn quota_soft_cap(limit: u64, overshoot: u64) -> u64 { + limit.saturating_add(overshoot) +} + +fn quota_exceeded_for_user_soft( + stats: &Stats, + user: &str, + quota_limit: Option, + overshoot: u64, +) -> bool { + quota_limit.is_some_and(|quota| stats.get_user_total_octets(user) >= quota_soft_cap(quota, overshoot)) +} + +fn quota_would_be_exceeded_for_user_soft( + stats: &Stats, + user: &str, + quota_limit: Option, + bytes: u64, + overshoot: u64, +) -> bool { + quota_limit.is_some_and(|quota| { + let cap = quota_soft_cap(quota, overshoot); + let used = stats.get_user_total_octets(user); + used >= cap || bytes > cap.saturating_sub(used) + }) +} + #[cfg(test)] fn quota_user_lock_test_guard() -> &'static Mutex<()> { static TEST_LOCK: OnceLock> = OnceLock::new(); @@ -786,6 +822,7 @@ where stats_clone.as_ref(), &user_clone, quota_limit, + d2c_flush_policy.quota_soft_overshoot_bytes, bytes_me2c_clone.as_ref(), conn_id, d2c_flush_policy.ack_flush_immediate, @@ -825,6 +862,7 @@ where stats_clone.as_ref(), &user_clone, quota_limit, + d2c_flush_policy.quota_soft_overshoot_bytes, bytes_me2c_clone.as_ref(), conn_id, d2c_flush_policy.ack_flush_immediate, @@ -864,6 +902,7 @@ where stats_clone.as_ref(), &user_clone, quota_limit, + d2c_flush_policy.quota_soft_overshoot_bytes, bytes_me2c_clone.as_ref(), conn_id, d2c_flush_policy.ack_flush_immediate, @@ -903,6 +942,7 @@ where stats_clone.as_ref(), &user_clone, quota_limit, + d2c_flush_policy.quota_soft_overshoot_bytes, bytes_me2c_clone.as_ref(), conn_id, d2c_flush_policy.ack_flush_immediate, @@ -933,6 +973,12 @@ where } writer.flush().await.map_err(ProxyError::Io)?; + let shrink_threshold = d2c_flush_policy.frame_buf_shrink_threshold_bytes; + let shrink_trigger = shrink_threshold + .saturating_mul(ME_D2C_FRAME_BUF_SHRINK_HYSTERESIS_FACTOR); + if frame_buf.capacity() > shrink_trigger { + frame_buf.shrink_to(shrink_threshold); + } } _ = &mut stop_rx => { debug!(conn_id, "ME writer stop signal"); @@ -1482,6 +1528,7 @@ async fn process_me_writer_response( stats: &Stats, user: &str, quota_limit: Option, + quota_soft_overshoot_bytes: u64, bytes_me2c: &AtomicU64, conn_id: u64, ack_flush_immediate: bool, @@ -1498,31 +1545,32 @@ where trace!(conn_id, bytes = data.len(), flags, "ME->C data"); } let data_len = data.len() as u64; - if let Some(limit) = quota_limit { - let quota_lock = quota_user_lock(user); - let _quota_guard = quota_lock.lock().await; - if quota_would_be_exceeded_for_user(stats, user, Some(limit), data_len) { - return Err(ProxyError::DataQuotaExceeded { - user: user.to_string(), - }); - } - write_client_payload(client_writer, proto_tag, flags, &data, rng, frame_buf) - .await?; + if quota_would_be_exceeded_for_user_soft( + stats, + user, + quota_limit, + data_len, + quota_soft_overshoot_bytes, + ) { + return Err(ProxyError::DataQuotaExceeded { + user: user.to_string(), + }); + } - bytes_me2c.fetch_add(data.len() as u64, Ordering::Relaxed); - stats.add_user_octets_to(user, data.len() as u64); + write_client_payload(client_writer, proto_tag, flags, &data, rng, frame_buf).await?; - if quota_exceeded_for_user(stats, user, Some(limit)) { - return Err(ProxyError::DataQuotaExceeded { - user: user.to_string(), - }); - } - } else { - write_client_payload(client_writer, proto_tag, flags, &data, rng, frame_buf) - .await?; + bytes_me2c.fetch_add(data.len() as u64, Ordering::Relaxed); + stats.add_user_octets_to(user, data.len() as u64); - bytes_me2c.fetch_add(data.len() as u64, Ordering::Relaxed); - stats.add_user_octets_to(user, data.len() as u64); + if quota_exceeded_for_user_soft( + stats, + user, + quota_limit, + quota_soft_overshoot_bytes, + ) { + return Err(ProxyError::DataQuotaExceeded { + user: user.to_string(), + }); } Ok(MeWriterResponseOutcome::Continue { @@ -1609,28 +1657,42 @@ where if quickack { first |= 0x80; } - frame_buf.clear(); - frame_buf.reserve(1 + data.len()); - frame_buf.push(first); - frame_buf.extend_from_slice(data); - client_writer - .write_all(frame_buf) - .await - .map_err(ProxyError::Io)?; + let wire_len = 1usize.saturating_add(data.len()); + if wire_len <= ME_D2C_SINGLE_WRITE_COALESCE_MAX_BYTES { + frame_buf.clear(); + frame_buf.reserve(wire_len); + frame_buf.push(first); + frame_buf.extend_from_slice(data); + client_writer + .write_all(frame_buf.as_slice()) + .await + .map_err(ProxyError::Io)?; + } else { + let header = [first]; + client_writer.write_all(&header).await.map_err(ProxyError::Io)?; + client_writer.write_all(data).await.map_err(ProxyError::Io)?; + } } else if len_words < (1 << 24) { let mut first = 0x7fu8; if quickack { first |= 0x80; } let lw = (len_words as u32).to_le_bytes(); - frame_buf.clear(); - frame_buf.reserve(4 + data.len()); - frame_buf.extend_from_slice(&[first, lw[0], lw[1], lw[2]]); - frame_buf.extend_from_slice(data); - client_writer - .write_all(frame_buf) - .await - .map_err(ProxyError::Io)?; + let wire_len = 4usize.saturating_add(data.len()); + if wire_len <= ME_D2C_SINGLE_WRITE_COALESCE_MAX_BYTES { + frame_buf.clear(); + frame_buf.reserve(wire_len); + frame_buf.extend_from_slice(&[first, lw[0], lw[1], lw[2]]); + frame_buf.extend_from_slice(data); + client_writer + .write_all(frame_buf.as_slice()) + .await + .map_err(ProxyError::Io)?; + } else { + let header = [first, lw[0], lw[1], lw[2]]; + client_writer.write_all(&header).await.map_err(ProxyError::Io)?; + client_writer.write_all(data).await.map_err(ProxyError::Io)?; + } } else { return Err(ProxyError::Proxy(format!( "Abridged frame too large: {}", @@ -1650,21 +1712,40 @@ where } else { 0 }; + let (len_val, total) = compute_intermediate_secure_wire_len(data.len(), padding_len, quickack)?; - frame_buf.clear(); - frame_buf.reserve(total); - frame_buf.extend_from_slice(&len_val.to_le_bytes()); - frame_buf.extend_from_slice(data); - if padding_len > 0 { - let start = frame_buf.len(); - frame_buf.resize(start + padding_len, 0); - rng.fill(&mut frame_buf[start..]); + if total <= ME_D2C_SINGLE_WRITE_COALESCE_MAX_BYTES { + frame_buf.clear(); + frame_buf.reserve(total); + frame_buf.extend_from_slice(&len_val.to_le_bytes()); + frame_buf.extend_from_slice(data); + if padding_len > 0 { + let start = frame_buf.len(); + frame_buf.resize(start + padding_len, 0); + rng.fill(&mut frame_buf[start..]); + } + client_writer + .write_all(frame_buf.as_slice()) + .await + .map_err(ProxyError::Io)?; + } else { + let header = len_val.to_le_bytes(); + client_writer.write_all(&header).await.map_err(ProxyError::Io)?; + client_writer.write_all(data).await.map_err(ProxyError::Io)?; + if padding_len > 0 { + frame_buf.clear(); + if frame_buf.capacity() < padding_len { + frame_buf.reserve(padding_len); + } + frame_buf.resize(padding_len, 0); + rng.fill(frame_buf.as_mut_slice()); + client_writer + .write_all(frame_buf.as_slice()) + .await + .map_err(ProxyError::Io)?; + } } - client_writer - .write_all(frame_buf) - .await - .map_err(ProxyError::Io)?; } } diff --git a/src/proxy/tests/middle_relay_security_tests.rs b/src/proxy/tests/middle_relay_security_tests.rs index 3be9524..1d3b736 100644 --- a/src/proxy/tests/middle_relay_security_tests.rs +++ b/src/proxy/tests/middle_relay_security_tests.rs @@ -1540,6 +1540,7 @@ async fn process_me_writer_response_ack_obeys_flush_policy() { &stats, "user", None, + 0, &bytes_me2c, 77, true, @@ -1566,6 +1567,7 @@ async fn process_me_writer_response_ack_obeys_flush_policy() { &stats, "user", None, + 0, &bytes_me2c, 77, false, @@ -1606,6 +1608,7 @@ async fn process_me_writer_response_data_updates_byte_accounting() { &stats, "user", None, + 0, &bytes_me2c, 88, false, @@ -1652,6 +1655,7 @@ async fn process_me_writer_response_data_enforces_live_user_quota() { &stats, "quota-user", Some(12), + 0, &bytes_me2c, 89, false, @@ -1700,6 +1704,7 @@ async fn process_me_writer_response_concurrent_same_user_quota_does_not_overshoo &stats, user, Some(1), + 0, &bytes_me2c, 91, false, @@ -1717,6 +1722,7 @@ async fn process_me_writer_response_concurrent_same_user_quota_does_not_overshoo &stats, user, Some(1), + 0, &bytes_me2c, 92, false, @@ -1765,6 +1771,7 @@ async fn process_me_writer_response_data_does_not_forward_partial_payload_when_r &stats, "partial-quota-user", Some(4), + 0, &bytes_me2c, 90, false, @@ -1970,6 +1977,7 @@ async fn run_quota_race_attempt( stats, user, Some(1), + 0, bytes_me2c, conn_id, false, diff --git a/src/transport/middle_proxy/reader.rs b/src/transport/middle_proxy/reader.rs index 4137b2b..46acd7e 100644 --- a/src/transport/middle_proxy/reader.rs +++ b/src/transport/middle_proxy/reader.rs @@ -126,14 +126,10 @@ pub(crate) async fn reader_loop( let data = body.slice(12..); trace!(cid, flags, len = data.len(), "RPC_PROXY_ANS"); - let data_wait_ms = reader_route_data_wait_ms.load(Ordering::Relaxed); - let routed = if data_wait_ms == 0 { - reg.route_nowait(cid, MeResponse::Data { flags, data }) - .await - } else { - reg.route_with_timeout(cid, MeResponse::Data { flags, data }, data_wait_ms) - .await - }; + let route_wait_ms = reader_route_data_wait_ms.load(Ordering::Relaxed); + let routed = reg + .route_with_timeout(cid, MeResponse::Data { flags, data }, route_wait_ms) + .await; if !matches!(routed, RouteResult::Routed) { match routed { RouteResult::NoConn => stats.increment_me_route_drop_no_conn(), From 7b570be5b3909ab0eea3f279c84041ea138bb433 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Sun, 22 Mar 2026 15:28:55 +0300 Subject: [PATCH 2/2] DC -> Client Runtime in Metrics and API --- src/api/model.rs | 18 ++ src/api/runtime_stats.rs | 19 ++ src/metrics.rs | 482 ++++++++++++++++++++++++++++++++++++++ src/proxy/middle_relay.rs | 190 ++++++++++++++- src/stats/mod.rs | 479 +++++++++++++++++++++++++++++++++++++ 5 files changed, 1181 insertions(+), 7 deletions(-) diff --git a/src/api/model.rs b/src/api/model.rs index 94b50f6..8ae0c0b 100644 --- a/src/api/model.rs +++ b/src/api/model.rs @@ -174,6 +174,24 @@ pub(super) struct ZeroMiddleProxyData { pub(super) route_drop_queue_full_total: u64, pub(super) route_drop_queue_full_base_total: u64, pub(super) route_drop_queue_full_high_total: u64, + pub(super) d2c_batches_total: u64, + pub(super) d2c_batch_frames_total: u64, + pub(super) d2c_batch_bytes_total: u64, + pub(super) d2c_flush_reason_queue_drain_total: u64, + pub(super) d2c_flush_reason_batch_frames_total: u64, + pub(super) d2c_flush_reason_batch_bytes_total: u64, + pub(super) d2c_flush_reason_max_delay_total: u64, + pub(super) d2c_flush_reason_ack_immediate_total: u64, + pub(super) d2c_flush_reason_close_total: u64, + pub(super) d2c_data_frames_total: u64, + pub(super) d2c_ack_frames_total: u64, + pub(super) d2c_payload_bytes_total: u64, + pub(super) d2c_write_mode_coalesced_total: u64, + pub(super) d2c_write_mode_split_total: u64, + pub(super) d2c_quota_reject_pre_write_total: u64, + pub(super) d2c_quota_reject_post_write_total: u64, + pub(super) d2c_frame_buf_shrink_total: u64, + pub(super) d2c_frame_buf_shrink_bytes_total: u64, pub(super) socks_kdf_strict_reject_total: u64, pub(super) socks_kdf_compat_fallback_total: u64, pub(super) endpoint_quarantine_total: u64, diff --git a/src/api/runtime_stats.rs b/src/api/runtime_stats.rs index 94f27a9..b66d1a5 100644 --- a/src/api/runtime_stats.rs +++ b/src/api/runtime_stats.rs @@ -68,6 +68,25 @@ pub(super) fn build_zero_all_data(stats: &Stats, configured_users: usize) -> Zer route_drop_queue_full_total: stats.get_me_route_drop_queue_full(), route_drop_queue_full_base_total: stats.get_me_route_drop_queue_full_base(), route_drop_queue_full_high_total: stats.get_me_route_drop_queue_full_high(), + d2c_batches_total: stats.get_me_d2c_batches_total(), + d2c_batch_frames_total: stats.get_me_d2c_batch_frames_total(), + d2c_batch_bytes_total: stats.get_me_d2c_batch_bytes_total(), + d2c_flush_reason_queue_drain_total: stats.get_me_d2c_flush_reason_queue_drain_total(), + d2c_flush_reason_batch_frames_total: stats.get_me_d2c_flush_reason_batch_frames_total(), + d2c_flush_reason_batch_bytes_total: stats.get_me_d2c_flush_reason_batch_bytes_total(), + d2c_flush_reason_max_delay_total: stats.get_me_d2c_flush_reason_max_delay_total(), + d2c_flush_reason_ack_immediate_total: stats + .get_me_d2c_flush_reason_ack_immediate_total(), + d2c_flush_reason_close_total: stats.get_me_d2c_flush_reason_close_total(), + d2c_data_frames_total: stats.get_me_d2c_data_frames_total(), + d2c_ack_frames_total: stats.get_me_d2c_ack_frames_total(), + d2c_payload_bytes_total: stats.get_me_d2c_payload_bytes_total(), + d2c_write_mode_coalesced_total: stats.get_me_d2c_write_mode_coalesced_total(), + d2c_write_mode_split_total: stats.get_me_d2c_write_mode_split_total(), + d2c_quota_reject_pre_write_total: stats.get_me_d2c_quota_reject_pre_write_total(), + d2c_quota_reject_post_write_total: stats.get_me_d2c_quota_reject_post_write_total(), + d2c_frame_buf_shrink_total: stats.get_me_d2c_frame_buf_shrink_total(), + d2c_frame_buf_shrink_bytes_total: stats.get_me_d2c_frame_buf_shrink_bytes_total(), socks_kdf_strict_reject_total: stats.get_me_socks_kdf_strict_reject(), socks_kdf_compat_fallback_total: stats.get_me_socks_kdf_compat_fallback(), endpoint_quarantine_total: stats.get_me_endpoint_quarantine_total(), diff --git a/src/metrics.rs b/src/metrics.rs index 2560294..a821d4d 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -935,6 +935,462 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp } ); + let _ = writeln!( + out, + "# HELP telemt_me_d2c_batches_total Total DC->Client flush batches" + ); + let _ = writeln!(out, "# TYPE telemt_me_d2c_batches_total counter"); + let _ = writeln!( + out, + "telemt_me_d2c_batches_total {}", + if me_allows_normal { + stats.get_me_d2c_batches_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_d2c_batch_frames_total Total DC->Client frames flushed in batches" + ); + let _ = writeln!(out, "# TYPE telemt_me_d2c_batch_frames_total counter"); + let _ = writeln!( + out, + "telemt_me_d2c_batch_frames_total {}", + if me_allows_normal { + stats.get_me_d2c_batch_frames_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_d2c_batch_bytes_total Total DC->Client bytes flushed in batches" + ); + let _ = writeln!(out, "# TYPE telemt_me_d2c_batch_bytes_total counter"); + let _ = writeln!( + out, + "telemt_me_d2c_batch_bytes_total {}", + if me_allows_normal { + stats.get_me_d2c_batch_bytes_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_d2c_flush_reason_total DC->Client flush reasons" + ); + let _ = writeln!(out, "# TYPE telemt_me_d2c_flush_reason_total counter"); + let _ = writeln!( + out, + "telemt_me_d2c_flush_reason_total{{reason=\"queue_drain\"}} {}", + if me_allows_normal { + stats.get_me_d2c_flush_reason_queue_drain_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_d2c_flush_reason_total{{reason=\"batch_frames\"}} {}", + if me_allows_normal { + stats.get_me_d2c_flush_reason_batch_frames_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_d2c_flush_reason_total{{reason=\"batch_bytes\"}} {}", + if me_allows_normal { + stats.get_me_d2c_flush_reason_batch_bytes_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_d2c_flush_reason_total{{reason=\"max_delay\"}} {}", + if me_allows_normal { + stats.get_me_d2c_flush_reason_max_delay_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_d2c_flush_reason_total{{reason=\"ack_immediate\"}} {}", + if me_allows_normal { + stats.get_me_d2c_flush_reason_ack_immediate_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_d2c_flush_reason_total{{reason=\"close\"}} {}", + if me_allows_normal { + stats.get_me_d2c_flush_reason_close_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_d2c_data_frames_total DC->Client data frames" + ); + let _ = writeln!(out, "# TYPE telemt_me_d2c_data_frames_total counter"); + let _ = writeln!( + out, + "telemt_me_d2c_data_frames_total {}", + if me_allows_normal { + stats.get_me_d2c_data_frames_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_d2c_ack_frames_total DC->Client quick-ack frames" + ); + let _ = writeln!(out, "# TYPE telemt_me_d2c_ack_frames_total counter"); + let _ = writeln!( + out, + "telemt_me_d2c_ack_frames_total {}", + if me_allows_normal { + stats.get_me_d2c_ack_frames_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_d2c_payload_bytes_total DC->Client payload bytes before transport framing" + ); + let _ = writeln!(out, "# TYPE telemt_me_d2c_payload_bytes_total counter"); + let _ = writeln!( + out, + "telemt_me_d2c_payload_bytes_total {}", + if me_allows_normal { + stats.get_me_d2c_payload_bytes_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_d2c_write_mode_total DC->Client writer mode selection" + ); + let _ = writeln!(out, "# TYPE telemt_me_d2c_write_mode_total counter"); + let _ = writeln!( + out, + "telemt_me_d2c_write_mode_total{{mode=\"coalesced\"}} {}", + if me_allows_normal { + stats.get_me_d2c_write_mode_coalesced_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_d2c_write_mode_total{{mode=\"split\"}} {}", + if me_allows_normal { + stats.get_me_d2c_write_mode_split_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_d2c_quota_reject_total DC->Client quota rejects" + ); + let _ = writeln!(out, "# TYPE telemt_me_d2c_quota_reject_total counter"); + let _ = writeln!( + out, + "telemt_me_d2c_quota_reject_total{{stage=\"pre_write\"}} {}", + if me_allows_normal { + stats.get_me_d2c_quota_reject_pre_write_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_d2c_quota_reject_total{{stage=\"post_write\"}} {}", + if me_allows_normal { + stats.get_me_d2c_quota_reject_post_write_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_d2c_frame_buf_shrink_total DC->Client reusable frame buffer shrink events" + ); + let _ = writeln!(out, "# TYPE telemt_me_d2c_frame_buf_shrink_total counter"); + let _ = writeln!( + out, + "telemt_me_d2c_frame_buf_shrink_total {}", + if me_allows_normal { + stats.get_me_d2c_frame_buf_shrink_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_d2c_frame_buf_shrink_bytes_total DC->Client reusable frame buffer bytes released" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_d2c_frame_buf_shrink_bytes_total counter" + ); + let _ = writeln!( + out, + "telemt_me_d2c_frame_buf_shrink_bytes_total {}", + if me_allows_normal { + stats.get_me_d2c_frame_buf_shrink_bytes_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_d2c_batch_frames_bucket_total DC->Client batch frame count buckets" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_d2c_batch_frames_bucket_total counter" + ); + let _ = writeln!( + out, + "telemt_me_d2c_batch_frames_bucket_total{{bucket=\"1\"}} {}", + if me_allows_debug { + stats.get_me_d2c_batch_frames_bucket_1() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_d2c_batch_frames_bucket_total{{bucket=\"2_4\"}} {}", + if me_allows_debug { + stats.get_me_d2c_batch_frames_bucket_2_4() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_d2c_batch_frames_bucket_total{{bucket=\"5_8\"}} {}", + if me_allows_debug { + stats.get_me_d2c_batch_frames_bucket_5_8() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_d2c_batch_frames_bucket_total{{bucket=\"9_16\"}} {}", + if me_allows_debug { + stats.get_me_d2c_batch_frames_bucket_9_16() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_d2c_batch_frames_bucket_total{{bucket=\"17_32\"}} {}", + if me_allows_debug { + stats.get_me_d2c_batch_frames_bucket_17_32() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_d2c_batch_frames_bucket_total{{bucket=\"gt_32\"}} {}", + if me_allows_debug { + stats.get_me_d2c_batch_frames_bucket_gt_32() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_d2c_batch_bytes_bucket_total DC->Client batch byte size buckets" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_d2c_batch_bytes_bucket_total counter" + ); + let _ = writeln!( + out, + "telemt_me_d2c_batch_bytes_bucket_total{{bucket=\"0_1k\"}} {}", + if me_allows_debug { + stats.get_me_d2c_batch_bytes_bucket_0_1k() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_d2c_batch_bytes_bucket_total{{bucket=\"1k_4k\"}} {}", + if me_allows_debug { + stats.get_me_d2c_batch_bytes_bucket_1k_4k() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_d2c_batch_bytes_bucket_total{{bucket=\"4k_16k\"}} {}", + if me_allows_debug { + stats.get_me_d2c_batch_bytes_bucket_4k_16k() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_d2c_batch_bytes_bucket_total{{bucket=\"16k_64k\"}} {}", + if me_allows_debug { + stats.get_me_d2c_batch_bytes_bucket_16k_64k() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_d2c_batch_bytes_bucket_total{{bucket=\"64k_128k\"}} {}", + if me_allows_debug { + stats.get_me_d2c_batch_bytes_bucket_64k_128k() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_d2c_batch_bytes_bucket_total{{bucket=\"gt_128k\"}} {}", + if me_allows_debug { + stats.get_me_d2c_batch_bytes_bucket_gt_128k() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_d2c_flush_duration_us_bucket_total DC->Client flush duration buckets" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_d2c_flush_duration_us_bucket_total counter" + ); + let _ = writeln!( + out, + "telemt_me_d2c_flush_duration_us_bucket_total{{bucket=\"0_50\"}} {}", + if me_allows_debug { + stats.get_me_d2c_flush_duration_us_bucket_0_50() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_d2c_flush_duration_us_bucket_total{{bucket=\"51_200\"}} {}", + if me_allows_debug { + stats.get_me_d2c_flush_duration_us_bucket_51_200() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_d2c_flush_duration_us_bucket_total{{bucket=\"201_1000\"}} {}", + if me_allows_debug { + stats.get_me_d2c_flush_duration_us_bucket_201_1000() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_d2c_flush_duration_us_bucket_total{{bucket=\"1001_5000\"}} {}", + if me_allows_debug { + stats.get_me_d2c_flush_duration_us_bucket_1001_5000() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_d2c_flush_duration_us_bucket_total{{bucket=\"5001_20000\"}} {}", + if me_allows_debug { + stats.get_me_d2c_flush_duration_us_bucket_5001_20000() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_d2c_flush_duration_us_bucket_total{{bucket=\"gt_20000\"}} {}", + if me_allows_debug { + stats.get_me_d2c_flush_duration_us_bucket_gt_20000() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_d2c_batch_timeout_armed_total DC->Client max-delay timer armed events" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_d2c_batch_timeout_armed_total counter" + ); + let _ = writeln!( + out, + "telemt_me_d2c_batch_timeout_armed_total {}", + if me_allows_debug { + stats.get_me_d2c_batch_timeout_armed_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_d2c_batch_timeout_fired_total DC->Client max-delay timer fired events" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_d2c_batch_timeout_fired_total counter" + ); + let _ = writeln!( + out, + "telemt_me_d2c_batch_timeout_fired_total {}", + if me_allows_debug { + stats.get_me_d2c_batch_timeout_fired_total() + } else { + 0 + } + ); + let _ = writeln!( out, "# HELP telemt_me_writer_pick_total ME writer-pick outcomes by mode and result" @@ -2145,6 +2601,16 @@ mod tests { stats.increment_relay_idle_hard_close_total(); stats.increment_relay_pressure_evict_total(); stats.increment_relay_protocol_desync_close_total(); + stats.increment_me_d2c_batches_total(); + stats.add_me_d2c_batch_frames_total(3); + stats.add_me_d2c_batch_bytes_total(2048); + stats.increment_me_d2c_flush_reason(crate::stats::MeD2cFlushReason::AckImmediate); + stats.increment_me_d2c_data_frames_total(); + stats.increment_me_d2c_ack_frames_total(); + stats.add_me_d2c_payload_bytes_total(1800); + stats.increment_me_d2c_write_mode(crate::stats::MeD2cWriteMode::Coalesced); + stats.increment_me_d2c_quota_reject_total(crate::stats::MeD2cQuotaRejectStage::PostWrite); + stats.observe_me_d2c_frame_buf_shrink(4096); stats.increment_user_connects("alice"); stats.increment_user_curr_connects("alice"); stats.add_user_octets_from("alice", 1024); @@ -2184,6 +2650,17 @@ mod tests { assert!(output.contains("telemt_relay_idle_hard_close_total 1")); assert!(output.contains("telemt_relay_pressure_evict_total 1")); assert!(output.contains("telemt_relay_protocol_desync_close_total 1")); + assert!(output.contains("telemt_me_d2c_batches_total 1")); + assert!(output.contains("telemt_me_d2c_batch_frames_total 3")); + assert!(output.contains("telemt_me_d2c_batch_bytes_total 2048")); + assert!(output.contains("telemt_me_d2c_flush_reason_total{reason=\"ack_immediate\"} 1")); + assert!(output.contains("telemt_me_d2c_data_frames_total 1")); + assert!(output.contains("telemt_me_d2c_ack_frames_total 1")); + assert!(output.contains("telemt_me_d2c_payload_bytes_total 1800")); + assert!(output.contains("telemt_me_d2c_write_mode_total{mode=\"coalesced\"} 1")); + assert!(output.contains("telemt_me_d2c_quota_reject_total{stage=\"post_write\"} 1")); + assert!(output.contains("telemt_me_d2c_frame_buf_shrink_total 1")); + assert!(output.contains("telemt_me_d2c_frame_buf_shrink_bytes_total 4096")); assert!(output.contains("telemt_user_connections_total{user=\"alice\"} 1")); assert!(output.contains("telemt_user_connections_current{user=\"alice\"} 1")); assert!(output.contains("telemt_user_octets_from_client{user=\"alice\"} 1024")); @@ -2245,6 +2722,11 @@ mod tests { assert!(output.contains("# TYPE telemt_relay_idle_hard_close_total counter")); assert!(output.contains("# TYPE telemt_relay_pressure_evict_total counter")); assert!(output.contains("# TYPE telemt_relay_protocol_desync_close_total counter")); + assert!(output.contains("# TYPE telemt_me_d2c_batches_total counter")); + assert!(output.contains("# TYPE telemt_me_d2c_flush_reason_total counter")); + assert!(output.contains("# TYPE telemt_me_d2c_write_mode_total counter")); + assert!(output.contains("# TYPE telemt_me_d2c_batch_frames_bucket_total counter")); + assert!(output.contains("# TYPE telemt_me_d2c_flush_duration_us_bucket_total counter")); assert!(output.contains("# TYPE telemt_me_writer_removed_total counter")); assert!( output diff --git a/src/proxy/middle_relay.rs b/src/proxy/middle_relay.rs index 2c9fe0c..8b8d3dc 100644 --- a/src/proxy/middle_relay.rs +++ b/src/proxy/middle_relay.rs @@ -21,7 +21,7 @@ use crate::proxy::route_mode::{ ROUTE_SWITCH_ERROR_MSG, RelayRouteMode, RouteCutoverState, affected_cutover_state, cutover_stagger_delay, }; -use crate::stats::Stats; +use crate::stats::{MeD2cFlushReason, MeD2cQuotaRejectStage, MeD2cWriteMode, Stats}; use crate::stream::{BufferPool, CryptoReader, CryptoWriter, PooledBuffer}; use crate::transport::middle_proxy::{MePool, MeResponse, proto_flags_for_tag}; @@ -574,6 +574,49 @@ fn quota_would_be_exceeded_for_user_soft( }) } +fn classify_me_d2c_flush_reason( + flush_immediately: bool, + batch_frames: usize, + max_frames: usize, + batch_bytes: usize, + max_bytes: usize, + max_delay_fired: bool, +) -> MeD2cFlushReason { + if flush_immediately { + return MeD2cFlushReason::AckImmediate; + } + if batch_frames >= max_frames { + return MeD2cFlushReason::BatchFrames; + } + if batch_bytes >= max_bytes { + return MeD2cFlushReason::BatchBytes; + } + if max_delay_fired { + return MeD2cFlushReason::MaxDelay; + } + MeD2cFlushReason::QueueDrain +} + +fn observe_me_d2c_flush_event( + stats: &Stats, + reason: MeD2cFlushReason, + batch_frames: usize, + batch_bytes: usize, + flush_duration_us: Option, +) { + stats.increment_me_d2c_flush_reason(reason); + if batch_frames > 0 || batch_bytes > 0 { + stats.increment_me_d2c_batches_total(); + stats.add_me_d2c_batch_frames_total(batch_frames as u64); + stats.add_me_d2c_batch_bytes_total(batch_bytes as u64); + stats.observe_me_d2c_batch_frames(batch_frames as u64); + stats.observe_me_d2c_batch_bytes(batch_bytes as u64); + } + if let Some(duration_us) = flush_duration_us { + stats.observe_me_d2c_flush_duration_us(duration_us); + } +} + #[cfg(test)] fn quota_user_lock_test_guard() -> &'static Mutex<()> { static TEST_LOCK: OnceLock> = OnceLock::new(); @@ -810,6 +853,7 @@ where let mut batch_frames = 0usize; let mut batch_bytes = 0usize; let mut flush_immediately; + let mut max_delay_fired = false; let first_is_downstream_activity = matches!(&first, MeResponse::Data { .. } | MeResponse::Ack(_)); @@ -838,7 +882,25 @@ where flush_immediately = immediate; } MeWriterResponseOutcome::Close => { + let flush_started_at = if stats_clone.telemetry_policy().me_level.allows_debug() { + Some(Instant::now()) + } else { + None + }; let _ = writer.flush().await; + let flush_duration_us = flush_started_at.map(|started| { + started + .elapsed() + .as_micros() + .min(u128::from(u64::MAX)) as u64 + }); + observe_me_d2c_flush_event( + stats_clone.as_ref(), + MeD2cFlushReason::Close, + batch_frames, + batch_bytes, + flush_duration_us, + ); return Ok(()); } } @@ -878,7 +940,27 @@ where flush_immediately |= immediate; } MeWriterResponseOutcome::Close => { + let flush_started_at = + if stats_clone.telemetry_policy().me_level.allows_debug() { + Some(Instant::now()) + } else { + None + }; let _ = writer.flush().await; + let flush_duration_us = flush_started_at.map(|started| { + started + .elapsed() + .as_micros() + .min(u128::from(u64::MAX)) + as u64 + }); + observe_me_d2c_flush_event( + stats_clone.as_ref(), + MeD2cFlushReason::Close, + batch_frames, + batch_bytes, + flush_duration_us, + ); return Ok(()); } } @@ -889,6 +971,7 @@ where && batch_frames < d2c_flush_policy.max_frames && batch_bytes < d2c_flush_policy.max_bytes { + stats_clone.increment_me_d2c_batch_timeout_armed_total(); match tokio::time::timeout(d2c_flush_policy.max_delay, me_rx_task.recv()).await { Ok(Some(next)) => { let next_is_downstream_activity = @@ -918,7 +1001,30 @@ where flush_immediately |= immediate; } MeWriterResponseOutcome::Close => { + let flush_started_at = if stats_clone + .telemetry_policy() + .me_level + .allows_debug() + { + Some(Instant::now()) + } else { + None + }; let _ = writer.flush().await; + let flush_duration_us = flush_started_at.map(|started| { + started + .elapsed() + .as_micros() + .min(u128::from(u64::MAX)) + as u64 + }); + observe_me_d2c_flush_event( + stats_clone.as_ref(), + MeD2cFlushReason::Close, + batch_frames, + batch_bytes, + flush_duration_us, + ); return Ok(()); } } @@ -958,7 +1064,30 @@ where flush_immediately |= immediate; } MeWriterResponseOutcome::Close => { + let flush_started_at = if stats_clone + .telemetry_policy() + .me_level + .allows_debug() + { + Some(Instant::now()) + } else { + None + }; let _ = writer.flush().await; + let flush_duration_us = flush_started_at.map(|started| { + started + .elapsed() + .as_micros() + .min(u128::from(u64::MAX)) + as u64 + }); + observe_me_d2c_flush_event( + stats_clone.as_ref(), + MeD2cFlushReason::Close, + batch_frames, + batch_bytes, + flush_duration_us, + ); return Ok(()); } } @@ -968,16 +1097,49 @@ where debug!(conn_id, "ME channel closed"); return Err(ProxyError::Proxy("ME connection lost".into())); } - Err(_) => {} + Err(_) => { + max_delay_fired = true; + stats_clone.increment_me_d2c_batch_timeout_fired_total(); + } } } + let flush_reason = classify_me_d2c_flush_reason( + flush_immediately, + batch_frames, + d2c_flush_policy.max_frames, + batch_bytes, + d2c_flush_policy.max_bytes, + max_delay_fired, + ); + let flush_started_at = if stats_clone.telemetry_policy().me_level.allows_debug() { + Some(Instant::now()) + } else { + None + }; writer.flush().await.map_err(ProxyError::Io)?; + let flush_duration_us = flush_started_at.map(|started| { + started + .elapsed() + .as_micros() + .min(u128::from(u64::MAX)) as u64 + }); + observe_me_d2c_flush_event( + stats_clone.as_ref(), + flush_reason, + batch_frames, + batch_bytes, + flush_duration_us, + ); let shrink_threshold = d2c_flush_policy.frame_buf_shrink_threshold_bytes; let shrink_trigger = shrink_threshold .saturating_mul(ME_D2C_FRAME_BUF_SHRINK_HYSTERESIS_FACTOR); if frame_buf.capacity() > shrink_trigger { + let cap_before = frame_buf.capacity(); frame_buf.shrink_to(shrink_threshold); + let cap_after = frame_buf.capacity(); + let bytes_freed = cap_before.saturating_sub(cap_after) as u64; + stats_clone.observe_me_d2c_frame_buf_shrink(bytes_freed); } } _ = &mut stop_rx => { @@ -1552,15 +1714,21 @@ where data_len, quota_soft_overshoot_bytes, ) { + stats.increment_me_d2c_quota_reject_total(MeD2cQuotaRejectStage::PreWrite); return Err(ProxyError::DataQuotaExceeded { user: user.to_string(), }); } - write_client_payload(client_writer, proto_tag, flags, &data, rng, frame_buf).await?; + let write_mode = + write_client_payload(client_writer, proto_tag, flags, &data, rng, frame_buf) + .await?; + stats.increment_me_d2c_write_mode(write_mode); bytes_me2c.fetch_add(data.len() as u64, Ordering::Relaxed); stats.add_user_octets_to(user, data.len() as u64); + stats.increment_me_d2c_data_frames_total(); + stats.add_me_d2c_payload_bytes_total(data.len() as u64); if quota_exceeded_for_user_soft( stats, @@ -1568,6 +1736,7 @@ where quota_limit, quota_soft_overshoot_bytes, ) { + stats.increment_me_d2c_quota_reject_total(MeD2cQuotaRejectStage::PostWrite); return Err(ProxyError::DataQuotaExceeded { user: user.to_string(), }); @@ -1586,6 +1755,7 @@ where trace!(conn_id, confirm, "ME->C quickack"); } write_client_ack(client_writer, proto_tag, confirm).await?; + stats.increment_me_d2c_ack_frames_total(); Ok(MeWriterResponseOutcome::Continue { frames: 1, @@ -1636,13 +1806,13 @@ async fn write_client_payload( data: &[u8], rng: &SecureRandom, frame_buf: &mut Vec, -) -> Result<()> +) -> Result where W: AsyncWrite + Unpin + Send + 'static, { let quickack = (flags & RPC_FLAG_QUICKACK) != 0; - match proto_tag { + let write_mode = match proto_tag { ProtoTag::Abridged => { if !data.len().is_multiple_of(4) { return Err(ProxyError::Proxy(format!( @@ -1667,10 +1837,12 @@ where .write_all(frame_buf.as_slice()) .await .map_err(ProxyError::Io)?; + MeD2cWriteMode::Coalesced } else { let header = [first]; client_writer.write_all(&header).await.map_err(ProxyError::Io)?; client_writer.write_all(data).await.map_err(ProxyError::Io)?; + MeD2cWriteMode::Split } } else if len_words < (1 << 24) { let mut first = 0x7fu8; @@ -1688,10 +1860,12 @@ where .write_all(frame_buf.as_slice()) .await .map_err(ProxyError::Io)?; + MeD2cWriteMode::Coalesced } else { let header = [first, lw[0], lw[1], lw[2]]; client_writer.write_all(&header).await.map_err(ProxyError::Io)?; client_writer.write_all(data).await.map_err(ProxyError::Io)?; + MeD2cWriteMode::Split } } else { return Err(ProxyError::Proxy(format!( @@ -1729,6 +1903,7 @@ where .write_all(frame_buf.as_slice()) .await .map_err(ProxyError::Io)?; + MeD2cWriteMode::Coalesced } else { let header = len_val.to_le_bytes(); client_writer.write_all(&header).await.map_err(ProxyError::Io)?; @@ -1745,11 +1920,12 @@ where .await .map_err(ProxyError::Io)?; } + MeD2cWriteMode::Split } } - } + }; - Ok(()) + Ok(write_mode) } async fn write_client_ack( diff --git a/src/stats/mod.rs b/src/stats/mod.rs index bdabe81..d13d834 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -26,6 +26,28 @@ enum RouteConnectionGauge { Middle, } +#[derive(Debug, Clone, Copy)] +pub enum MeD2cFlushReason { + QueueDrain, + BatchFrames, + BatchBytes, + MaxDelay, + AckImmediate, + Close, +} + +#[derive(Debug, Clone, Copy)] +pub enum MeD2cWriteMode { + Coalesced, + Split, +} + +#[derive(Debug, Clone, Copy)] +pub enum MeD2cQuotaRejectStage { + PreWrite, + PostWrite, +} + #[must_use = "RouteConnectionLease must be kept alive to hold the connection gauge increment"] pub struct RouteConnectionLease { stats: Arc, @@ -140,6 +162,44 @@ pub struct Stats { me_route_drop_queue_full: AtomicU64, me_route_drop_queue_full_base: AtomicU64, me_route_drop_queue_full_high: AtomicU64, + me_d2c_batches_total: AtomicU64, + me_d2c_batch_frames_total: AtomicU64, + me_d2c_batch_bytes_total: AtomicU64, + me_d2c_flush_reason_queue_drain_total: AtomicU64, + me_d2c_flush_reason_batch_frames_total: AtomicU64, + me_d2c_flush_reason_batch_bytes_total: AtomicU64, + me_d2c_flush_reason_max_delay_total: AtomicU64, + me_d2c_flush_reason_ack_immediate_total: AtomicU64, + me_d2c_flush_reason_close_total: AtomicU64, + me_d2c_data_frames_total: AtomicU64, + me_d2c_ack_frames_total: AtomicU64, + me_d2c_payload_bytes_total: AtomicU64, + me_d2c_write_mode_coalesced_total: AtomicU64, + me_d2c_write_mode_split_total: AtomicU64, + me_d2c_quota_reject_pre_write_total: AtomicU64, + me_d2c_quota_reject_post_write_total: AtomicU64, + me_d2c_frame_buf_shrink_total: AtomicU64, + me_d2c_frame_buf_shrink_bytes_total: AtomicU64, + me_d2c_batch_frames_bucket_1: AtomicU64, + me_d2c_batch_frames_bucket_2_4: AtomicU64, + me_d2c_batch_frames_bucket_5_8: AtomicU64, + me_d2c_batch_frames_bucket_9_16: AtomicU64, + me_d2c_batch_frames_bucket_17_32: AtomicU64, + me_d2c_batch_frames_bucket_gt_32: AtomicU64, + me_d2c_batch_bytes_bucket_0_1k: AtomicU64, + me_d2c_batch_bytes_bucket_1k_4k: AtomicU64, + me_d2c_batch_bytes_bucket_4k_16k: AtomicU64, + me_d2c_batch_bytes_bucket_16k_64k: AtomicU64, + me_d2c_batch_bytes_bucket_64k_128k: AtomicU64, + me_d2c_batch_bytes_bucket_gt_128k: AtomicU64, + me_d2c_flush_duration_us_bucket_0_50: AtomicU64, + me_d2c_flush_duration_us_bucket_51_200: AtomicU64, + me_d2c_flush_duration_us_bucket_201_1000: AtomicU64, + me_d2c_flush_duration_us_bucket_1001_5000: AtomicU64, + me_d2c_flush_duration_us_bucket_5001_20000: AtomicU64, + me_d2c_flush_duration_us_bucket_gt_20000: AtomicU64, + me_d2c_batch_timeout_armed_total: AtomicU64, + me_d2c_batch_timeout_fired_total: AtomicU64, me_writer_pick_sorted_rr_success_try_total: AtomicU64, me_writer_pick_sorted_rr_success_fallback_total: AtomicU64, me_writer_pick_sorted_rr_full_total: AtomicU64, @@ -594,6 +654,215 @@ impl Stats { .fetch_add(1, Ordering::Relaxed); } } + pub fn increment_me_d2c_batches_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_d2c_batches_total.fetch_add(1, Ordering::Relaxed); + } + } + pub fn add_me_d2c_batch_frames_total(&self, frames: u64) { + if self.telemetry_me_allows_normal() { + self.me_d2c_batch_frames_total + .fetch_add(frames, Ordering::Relaxed); + } + } + pub fn add_me_d2c_batch_bytes_total(&self, bytes: u64) { + if self.telemetry_me_allows_normal() { + self.me_d2c_batch_bytes_total + .fetch_add(bytes, Ordering::Relaxed); + } + } + pub fn increment_me_d2c_flush_reason(&self, reason: MeD2cFlushReason) { + if !self.telemetry_me_allows_normal() { + return; + } + match reason { + MeD2cFlushReason::QueueDrain => { + self.me_d2c_flush_reason_queue_drain_total + .fetch_add(1, Ordering::Relaxed); + } + MeD2cFlushReason::BatchFrames => { + self.me_d2c_flush_reason_batch_frames_total + .fetch_add(1, Ordering::Relaxed); + } + MeD2cFlushReason::BatchBytes => { + self.me_d2c_flush_reason_batch_bytes_total + .fetch_add(1, Ordering::Relaxed); + } + MeD2cFlushReason::MaxDelay => { + self.me_d2c_flush_reason_max_delay_total + .fetch_add(1, Ordering::Relaxed); + } + MeD2cFlushReason::AckImmediate => { + self.me_d2c_flush_reason_ack_immediate_total + .fetch_add(1, Ordering::Relaxed); + } + MeD2cFlushReason::Close => { + self.me_d2c_flush_reason_close_total + .fetch_add(1, Ordering::Relaxed); + } + } + } + pub fn increment_me_d2c_data_frames_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_d2c_data_frames_total.fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_d2c_ack_frames_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_d2c_ack_frames_total.fetch_add(1, Ordering::Relaxed); + } + } + pub fn add_me_d2c_payload_bytes_total(&self, bytes: u64) { + if self.telemetry_me_allows_normal() { + self.me_d2c_payload_bytes_total + .fetch_add(bytes, Ordering::Relaxed); + } + } + pub fn increment_me_d2c_write_mode(&self, mode: MeD2cWriteMode) { + if !self.telemetry_me_allows_normal() { + return; + } + match mode { + MeD2cWriteMode::Coalesced => { + self.me_d2c_write_mode_coalesced_total + .fetch_add(1, Ordering::Relaxed); + } + MeD2cWriteMode::Split => { + self.me_d2c_write_mode_split_total + .fetch_add(1, Ordering::Relaxed); + } + } + } + pub fn increment_me_d2c_quota_reject_total(&self, stage: MeD2cQuotaRejectStage) { + if !self.telemetry_me_allows_normal() { + return; + } + match stage { + MeD2cQuotaRejectStage::PreWrite => { + self.me_d2c_quota_reject_pre_write_total + .fetch_add(1, Ordering::Relaxed); + } + MeD2cQuotaRejectStage::PostWrite => { + self.me_d2c_quota_reject_post_write_total + .fetch_add(1, Ordering::Relaxed); + } + } + } + pub fn observe_me_d2c_frame_buf_shrink(&self, bytes_freed: u64) { + if !self.telemetry_me_allows_normal() { + return; + } + self.me_d2c_frame_buf_shrink_total + .fetch_add(1, Ordering::Relaxed); + self.me_d2c_frame_buf_shrink_bytes_total + .fetch_add(bytes_freed, Ordering::Relaxed); + } + pub fn observe_me_d2c_batch_frames(&self, frames: u64) { + if !self.telemetry_me_allows_debug() { + return; + } + match frames { + 0 => {} + 1 => { + self.me_d2c_batch_frames_bucket_1 + .fetch_add(1, Ordering::Relaxed); + } + 2..=4 => { + self.me_d2c_batch_frames_bucket_2_4 + .fetch_add(1, Ordering::Relaxed); + } + 5..=8 => { + self.me_d2c_batch_frames_bucket_5_8 + .fetch_add(1, Ordering::Relaxed); + } + 9..=16 => { + self.me_d2c_batch_frames_bucket_9_16 + .fetch_add(1, Ordering::Relaxed); + } + 17..=32 => { + self.me_d2c_batch_frames_bucket_17_32 + .fetch_add(1, Ordering::Relaxed); + } + _ => { + self.me_d2c_batch_frames_bucket_gt_32 + .fetch_add(1, Ordering::Relaxed); + } + } + } + pub fn observe_me_d2c_batch_bytes(&self, bytes: u64) { + if !self.telemetry_me_allows_debug() { + return; + } + match bytes { + 0..=1024 => { + self.me_d2c_batch_bytes_bucket_0_1k + .fetch_add(1, Ordering::Relaxed); + } + 1025..=4096 => { + self.me_d2c_batch_bytes_bucket_1k_4k + .fetch_add(1, Ordering::Relaxed); + } + 4097..=16_384 => { + self.me_d2c_batch_bytes_bucket_4k_16k + .fetch_add(1, Ordering::Relaxed); + } + 16_385..=65_536 => { + self.me_d2c_batch_bytes_bucket_16k_64k + .fetch_add(1, Ordering::Relaxed); + } + 65_537..=131_072 => { + self.me_d2c_batch_bytes_bucket_64k_128k + .fetch_add(1, Ordering::Relaxed); + } + _ => { + self.me_d2c_batch_bytes_bucket_gt_128k + .fetch_add(1, Ordering::Relaxed); + } + } + } + pub fn observe_me_d2c_flush_duration_us(&self, duration_us: u64) { + if !self.telemetry_me_allows_debug() { + return; + } + match duration_us { + 0..=50 => { + self.me_d2c_flush_duration_us_bucket_0_50 + .fetch_add(1, Ordering::Relaxed); + } + 51..=200 => { + self.me_d2c_flush_duration_us_bucket_51_200 + .fetch_add(1, Ordering::Relaxed); + } + 201..=1000 => { + self.me_d2c_flush_duration_us_bucket_201_1000 + .fetch_add(1, Ordering::Relaxed); + } + 1001..=5000 => { + self.me_d2c_flush_duration_us_bucket_1001_5000 + .fetch_add(1, Ordering::Relaxed); + } + 5001..=20_000 => { + self.me_d2c_flush_duration_us_bucket_5001_20000 + .fetch_add(1, Ordering::Relaxed); + } + _ => { + self.me_d2c_flush_duration_us_bucket_gt_20000 + .fetch_add(1, Ordering::Relaxed); + } + } + } + pub fn increment_me_d2c_batch_timeout_armed_total(&self) { + if self.telemetry_me_allows_debug() { + self.me_d2c_batch_timeout_armed_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_d2c_batch_timeout_fired_total(&self) { + if self.telemetry_me_allows_debug() { + self.me_d2c_batch_timeout_fired_total + .fetch_add(1, Ordering::Relaxed); + } + } pub fn increment_me_writer_pick_success_try_total(&self, mode: MeWriterPickMode) { if !self.telemetry_me_allows_normal() { return; @@ -1229,6 +1498,142 @@ impl Stats { pub fn get_me_route_drop_queue_full_high(&self) -> u64 { self.me_route_drop_queue_full_high.load(Ordering::Relaxed) } + pub fn get_me_d2c_batches_total(&self) -> u64 { + self.me_d2c_batches_total.load(Ordering::Relaxed) + } + pub fn get_me_d2c_batch_frames_total(&self) -> u64 { + self.me_d2c_batch_frames_total.load(Ordering::Relaxed) + } + pub fn get_me_d2c_batch_bytes_total(&self) -> u64 { + self.me_d2c_batch_bytes_total.load(Ordering::Relaxed) + } + pub fn get_me_d2c_flush_reason_queue_drain_total(&self) -> u64 { + self.me_d2c_flush_reason_queue_drain_total + .load(Ordering::Relaxed) + } + pub fn get_me_d2c_flush_reason_batch_frames_total(&self) -> u64 { + self.me_d2c_flush_reason_batch_frames_total + .load(Ordering::Relaxed) + } + pub fn get_me_d2c_flush_reason_batch_bytes_total(&self) -> u64 { + self.me_d2c_flush_reason_batch_bytes_total + .load(Ordering::Relaxed) + } + pub fn get_me_d2c_flush_reason_max_delay_total(&self) -> u64 { + self.me_d2c_flush_reason_max_delay_total + .load(Ordering::Relaxed) + } + pub fn get_me_d2c_flush_reason_ack_immediate_total(&self) -> u64 { + self.me_d2c_flush_reason_ack_immediate_total + .load(Ordering::Relaxed) + } + pub fn get_me_d2c_flush_reason_close_total(&self) -> u64 { + self.me_d2c_flush_reason_close_total.load(Ordering::Relaxed) + } + pub fn get_me_d2c_data_frames_total(&self) -> u64 { + self.me_d2c_data_frames_total.load(Ordering::Relaxed) + } + pub fn get_me_d2c_ack_frames_total(&self) -> u64 { + self.me_d2c_ack_frames_total.load(Ordering::Relaxed) + } + pub fn get_me_d2c_payload_bytes_total(&self) -> u64 { + self.me_d2c_payload_bytes_total.load(Ordering::Relaxed) + } + pub fn get_me_d2c_write_mode_coalesced_total(&self) -> u64 { + self.me_d2c_write_mode_coalesced_total + .load(Ordering::Relaxed) + } + pub fn get_me_d2c_write_mode_split_total(&self) -> u64 { + self.me_d2c_write_mode_split_total.load(Ordering::Relaxed) + } + pub fn get_me_d2c_quota_reject_pre_write_total(&self) -> u64 { + self.me_d2c_quota_reject_pre_write_total + .load(Ordering::Relaxed) + } + pub fn get_me_d2c_quota_reject_post_write_total(&self) -> u64 { + self.me_d2c_quota_reject_post_write_total + .load(Ordering::Relaxed) + } + pub fn get_me_d2c_frame_buf_shrink_total(&self) -> u64 { + self.me_d2c_frame_buf_shrink_total.load(Ordering::Relaxed) + } + pub fn get_me_d2c_frame_buf_shrink_bytes_total(&self) -> u64 { + self.me_d2c_frame_buf_shrink_bytes_total + .load(Ordering::Relaxed) + } + pub fn get_me_d2c_batch_frames_bucket_1(&self) -> u64 { + self.me_d2c_batch_frames_bucket_1.load(Ordering::Relaxed) + } + pub fn get_me_d2c_batch_frames_bucket_2_4(&self) -> u64 { + self.me_d2c_batch_frames_bucket_2_4.load(Ordering::Relaxed) + } + pub fn get_me_d2c_batch_frames_bucket_5_8(&self) -> u64 { + self.me_d2c_batch_frames_bucket_5_8.load(Ordering::Relaxed) + } + pub fn get_me_d2c_batch_frames_bucket_9_16(&self) -> u64 { + self.me_d2c_batch_frames_bucket_9_16.load(Ordering::Relaxed) + } + pub fn get_me_d2c_batch_frames_bucket_17_32(&self) -> u64 { + self.me_d2c_batch_frames_bucket_17_32 + .load(Ordering::Relaxed) + } + pub fn get_me_d2c_batch_frames_bucket_gt_32(&self) -> u64 { + self.me_d2c_batch_frames_bucket_gt_32 + .load(Ordering::Relaxed) + } + pub fn get_me_d2c_batch_bytes_bucket_0_1k(&self) -> u64 { + self.me_d2c_batch_bytes_bucket_0_1k.load(Ordering::Relaxed) + } + pub fn get_me_d2c_batch_bytes_bucket_1k_4k(&self) -> u64 { + self.me_d2c_batch_bytes_bucket_1k_4k.load(Ordering::Relaxed) + } + pub fn get_me_d2c_batch_bytes_bucket_4k_16k(&self) -> u64 { + self.me_d2c_batch_bytes_bucket_4k_16k.load(Ordering::Relaxed) + } + pub fn get_me_d2c_batch_bytes_bucket_16k_64k(&self) -> u64 { + self.me_d2c_batch_bytes_bucket_16k_64k + .load(Ordering::Relaxed) + } + pub fn get_me_d2c_batch_bytes_bucket_64k_128k(&self) -> u64 { + self.me_d2c_batch_bytes_bucket_64k_128k + .load(Ordering::Relaxed) + } + pub fn get_me_d2c_batch_bytes_bucket_gt_128k(&self) -> u64 { + self.me_d2c_batch_bytes_bucket_gt_128k + .load(Ordering::Relaxed) + } + pub fn get_me_d2c_flush_duration_us_bucket_0_50(&self) -> u64 { + self.me_d2c_flush_duration_us_bucket_0_50 + .load(Ordering::Relaxed) + } + pub fn get_me_d2c_flush_duration_us_bucket_51_200(&self) -> u64 { + self.me_d2c_flush_duration_us_bucket_51_200 + .load(Ordering::Relaxed) + } + pub fn get_me_d2c_flush_duration_us_bucket_201_1000(&self) -> u64 { + self.me_d2c_flush_duration_us_bucket_201_1000 + .load(Ordering::Relaxed) + } + pub fn get_me_d2c_flush_duration_us_bucket_1001_5000(&self) -> u64 { + self.me_d2c_flush_duration_us_bucket_1001_5000 + .load(Ordering::Relaxed) + } + pub fn get_me_d2c_flush_duration_us_bucket_5001_20000(&self) -> u64 { + self.me_d2c_flush_duration_us_bucket_5001_20000 + .load(Ordering::Relaxed) + } + pub fn get_me_d2c_flush_duration_us_bucket_gt_20000(&self) -> u64 { + self.me_d2c_flush_duration_us_bucket_gt_20000 + .load(Ordering::Relaxed) + } + pub fn get_me_d2c_batch_timeout_armed_total(&self) -> u64 { + self.me_d2c_batch_timeout_armed_total + .load(Ordering::Relaxed) + } + pub fn get_me_d2c_batch_timeout_fired_total(&self) -> u64 { + self.me_d2c_batch_timeout_fired_total + .load(Ordering::Relaxed) + } pub fn get_me_writer_pick_sorted_rr_success_try_total(&self) -> u64 { self.me_writer_pick_sorted_rr_success_try_total .load(Ordering::Relaxed) @@ -1898,9 +2303,83 @@ mod tests { stats.increment_me_crc_mismatch(); stats.increment_me_keepalive_sent(); stats.increment_me_route_drop_queue_full(); + stats.increment_me_d2c_batches_total(); + stats.add_me_d2c_batch_frames_total(4); + stats.add_me_d2c_batch_bytes_total(4096); + stats.increment_me_d2c_flush_reason(MeD2cFlushReason::BatchBytes); + stats.increment_me_d2c_write_mode(MeD2cWriteMode::Coalesced); + stats.increment_me_d2c_quota_reject_total(MeD2cQuotaRejectStage::PreWrite); + stats.observe_me_d2c_frame_buf_shrink(1024); + stats.observe_me_d2c_batch_frames(4); + stats.observe_me_d2c_batch_bytes(4096); + stats.observe_me_d2c_flush_duration_us(120); + stats.increment_me_d2c_batch_timeout_armed_total(); + stats.increment_me_d2c_batch_timeout_fired_total(); assert_eq!(stats.get_me_crc_mismatch(), 0); assert_eq!(stats.get_me_keepalive_sent(), 0); assert_eq!(stats.get_me_route_drop_queue_full(), 0); + assert_eq!(stats.get_me_d2c_batches_total(), 0); + assert_eq!(stats.get_me_d2c_flush_reason_batch_bytes_total(), 0); + assert_eq!(stats.get_me_d2c_write_mode_coalesced_total(), 0); + assert_eq!(stats.get_me_d2c_quota_reject_pre_write_total(), 0); + assert_eq!(stats.get_me_d2c_frame_buf_shrink_total(), 0); + assert_eq!(stats.get_me_d2c_batch_frames_bucket_2_4(), 0); + assert_eq!(stats.get_me_d2c_batch_bytes_bucket_1k_4k(), 0); + assert_eq!(stats.get_me_d2c_flush_duration_us_bucket_51_200(), 0); + assert_eq!(stats.get_me_d2c_batch_timeout_armed_total(), 0); + assert_eq!(stats.get_me_d2c_batch_timeout_fired_total(), 0); + } + + #[test] + fn test_telemetry_policy_me_normal_blocks_d2c_debug_metrics() { + let stats = Stats::new(); + stats.apply_telemetry_policy(TelemetryPolicy { + core_enabled: true, + user_enabled: true, + me_level: MeTelemetryLevel::Normal, + }); + + stats.increment_me_d2c_batches_total(); + stats.add_me_d2c_batch_frames_total(2); + stats.add_me_d2c_batch_bytes_total(2048); + stats.increment_me_d2c_flush_reason(MeD2cFlushReason::QueueDrain); + stats.observe_me_d2c_batch_frames(2); + stats.observe_me_d2c_batch_bytes(2048); + stats.observe_me_d2c_flush_duration_us(100); + stats.increment_me_d2c_batch_timeout_armed_total(); + stats.increment_me_d2c_batch_timeout_fired_total(); + + assert_eq!(stats.get_me_d2c_batches_total(), 1); + assert_eq!(stats.get_me_d2c_batch_frames_total(), 2); + assert_eq!(stats.get_me_d2c_batch_bytes_total(), 2048); + assert_eq!(stats.get_me_d2c_flush_reason_queue_drain_total(), 1); + assert_eq!(stats.get_me_d2c_batch_frames_bucket_2_4(), 0); + assert_eq!(stats.get_me_d2c_batch_bytes_bucket_1k_4k(), 0); + assert_eq!(stats.get_me_d2c_flush_duration_us_bucket_51_200(), 0); + assert_eq!(stats.get_me_d2c_batch_timeout_armed_total(), 0); + assert_eq!(stats.get_me_d2c_batch_timeout_fired_total(), 0); + } + + #[test] + fn test_telemetry_policy_me_debug_enables_d2c_debug_metrics() { + let stats = Stats::new(); + stats.apply_telemetry_policy(TelemetryPolicy { + core_enabled: true, + user_enabled: true, + me_level: MeTelemetryLevel::Debug, + }); + + stats.observe_me_d2c_batch_frames(7); + stats.observe_me_d2c_batch_bytes(70_000); + stats.observe_me_d2c_flush_duration_us(1400); + stats.increment_me_d2c_batch_timeout_armed_total(); + stats.increment_me_d2c_batch_timeout_fired_total(); + + assert_eq!(stats.get_me_d2c_batch_frames_bucket_5_8(), 1); + assert_eq!(stats.get_me_d2c_batch_bytes_bucket_64k_128k(), 1); + assert_eq!(stats.get_me_d2c_flush_duration_us_bucket_1001_5000(), 1); + assert_eq!(stats.get_me_d2c_batch_timeout_armed_total(), 1); + assert_eq!(stats.get_me_d2c_batch_timeout_fired_total(), 1); } #[test]