diff --git a/src/metrics.rs b/src/metrics.rs index c05737f..091ce52 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -293,6 +293,27 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp } ); + let _ = writeln!( + out, + "# HELP telemt_buffer_pool_buffers_total Snapshot of pooled and allocated buffers" + ); + let _ = writeln!(out, "# TYPE telemt_buffer_pool_buffers_total gauge"); + let _ = writeln!( + out, + "telemt_buffer_pool_buffers_total{{kind=\"pooled\"}} {}", + stats.get_buffer_pool_pooled_gauge() + ); + let _ = writeln!( + out, + "telemt_buffer_pool_buffers_total{{kind=\"allocated\"}} {}", + stats.get_buffer_pool_allocated_gauge() + ); + let _ = writeln!( + out, + "telemt_buffer_pool_buffers_total{{kind=\"in_use\"}} {}", + stats.get_buffer_pool_in_use_gauge() + ); + let _ = writeln!( out, "# HELP telemt_connections_total Total accepted connections" @@ -941,6 +962,39 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp } ); + let _ = writeln!( + out, + "# HELP telemt_me_c2me_enqueue_events_total ME client->ME enqueue outcomes" + ); + let _ = writeln!(out, "# TYPE telemt_me_c2me_enqueue_events_total counter"); + let _ = writeln!( + out, + "telemt_me_c2me_enqueue_events_total{{event=\"full\"}} {}", + if me_allows_normal { + stats.get_me_c2me_send_full_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_c2me_enqueue_events_total{{event=\"high_water\"}} {}", + if me_allows_normal { + stats.get_me_c2me_send_high_water_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_c2me_enqueue_events_total{{event=\"timeout\"}} {}", + if me_allows_normal { + stats.get_me_c2me_send_timeout_total() + } else { + 0 + } + ); + let _ = writeln!( out, "# HELP telemt_me_d2c_batches_total Total DC->Client flush batches" diff --git a/src/proxy/direct_relay.rs b/src/proxy/direct_relay.rs index 32f2370..71ebf38 100644 --- a/src/proxy/direct_relay.rs +++ b/src/proxy/direct_relay.rs @@ -323,6 +323,12 @@ where } buffer_pool_trim.trim_to(buffer_pool_trim.max_buffers().min(64)); + let pool_snapshot = buffer_pool_trim.stats(); + stats.set_buffer_pool_gauges( + pool_snapshot.pooled, + pool_snapshot.allocated, + pool_snapshot.allocated.saturating_sub(pool_snapshot.pooled), + ); relay_result } diff --git a/src/proxy/middle_relay.rs b/src/proxy/middle_relay.rs index 01833fc..85c1844 100644 --- a/src/proxy/middle_relay.rs +++ b/src/proxy/middle_relay.rs @@ -645,11 +645,14 @@ async fn enqueue_c2me_command( tx: &mpsc::Sender, cmd: C2MeCommand, send_timeout: Option, + stats: &Stats, ) -> std::result::Result<(), mpsc::error::SendError> { match tx.try_send(cmd) { Ok(()) => Ok(()), Err(mpsc::error::TrySendError::Closed(cmd)) => Err(mpsc::error::SendError(cmd)), Err(mpsc::error::TrySendError::Full(cmd)) => { + stats.increment_me_c2me_send_full_total(); + stats.increment_me_c2me_send_high_water_total(); note_relay_pressure_event(); // Cooperative yield reduces burst catch-up when the per-conn queue is near saturation. if tx.capacity() <= C2ME_SOFT_PRESSURE_MIN_FREE_SLOTS { @@ -658,7 +661,10 @@ async fn enqueue_c2me_command( let reserve_result = match send_timeout { Some(send_timeout) => match timeout(send_timeout, tx.reserve()).await { Ok(result) => result, - Err(_) => return Err(mpsc::error::SendError(cmd)), + Err(_) => { + stats.increment_me_c2me_send_timeout_total(); + return Err(mpsc::error::SendError(cmd)); + } }, None => tx.reserve().await, }; @@ -667,7 +673,10 @@ async fn enqueue_c2me_command( permit.send(cmd); Ok(()) } - Err(_) => Err(mpsc::error::SendError(cmd)), + Err(_) => { + stats.increment_me_c2me_send_timeout_total(); + Err(mpsc::error::SendError(cmd)) + } } } } @@ -1190,7 +1199,9 @@ where user = %user, "Middle-relay pressure eviction for idle-candidate session" ); - let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close, c2me_send_timeout).await; + let _ = + enqueue_c2me_command(&c2me_tx, C2MeCommand::Close, c2me_send_timeout, stats.as_ref()) + .await; main_result = Err(ProxyError::Proxy( "middle-relay session evicted under pressure (idle-candidate)".to_string(), )); @@ -1209,7 +1220,9 @@ where "Cutover affected middle session, closing client connection" ); tokio::time::sleep(delay).await; - let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close, c2me_send_timeout).await; + let _ = + enqueue_c2me_command(&c2me_tx, C2MeCommand::Close, c2me_send_timeout, stats.as_ref()) + .await; main_result = Err(ProxyError::Proxy(ROUTE_SWITCH_ERROR_MSG.to_string())); break; } @@ -1271,6 +1284,7 @@ where &c2me_tx, C2MeCommand::Data { payload, flags }, c2me_send_timeout, + stats.as_ref(), ) .await .is_err() @@ -1282,9 +1296,13 @@ where Ok(None) => { debug!(conn_id, "Client EOF"); client_closed = true; - let _ = - enqueue_c2me_command(&c2me_tx, C2MeCommand::Close, c2me_send_timeout) - .await; + let _ = enqueue_c2me_command( + &c2me_tx, + C2MeCommand::Close, + c2me_send_timeout, + stats.as_ref(), + ) + .await; break; } Err(e) => { @@ -1336,6 +1354,12 @@ where clear_relay_idle_candidate(conn_id); me_pool.registry().unregister(conn_id).await; buffer_pool.trim_to(buffer_pool.max_buffers().min(64)); + let pool_snapshot = buffer_pool.stats(); + stats.set_buffer_pool_gauges( + pool_snapshot.pooled, + pool_snapshot.allocated, + pool_snapshot.allocated.saturating_sub(pool_snapshot.pooled), + ); result } diff --git a/src/proxy/tests/middle_relay_stub_completion_security_tests.rs b/src/proxy/tests/middle_relay_stub_completion_security_tests.rs index fbb9081..978ee29 100644 --- a/src/proxy/tests/middle_relay_stub_completion_security_tests.rs +++ b/src/proxy/tests/middle_relay_stub_completion_security_tests.rs @@ -1,4 +1,5 @@ use super::*; +use crate::stats::Stats; use crate::stream::BufferPool; use std::collections::HashSet; use std::sync::Arc; @@ -119,6 +120,7 @@ async fn c2me_channel_full_path_yields_then_sends() { .expect("priming queue with one frame must succeed"); let tx2 = tx.clone(); + let stats = Stats::default(); let producer = tokio::spawn(async move { enqueue_c2me_command( &tx2, @@ -127,6 +129,7 @@ async fn c2me_channel_full_path_yields_then_sends() { flags: 2, }, None, + &stats, ) .await }); diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 18cf360..a87ec7e 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -200,6 +200,14 @@ pub struct Stats { me_d2c_flush_duration_us_bucket_1001_5000: AtomicU64, me_d2c_flush_duration_us_bucket_5001_20000: AtomicU64, me_d2c_flush_duration_us_bucket_gt_20000: AtomicU64, + // Buffer pool gauges + buffer_pool_pooled_gauge: AtomicU64, + buffer_pool_allocated_gauge: AtomicU64, + buffer_pool_in_use_gauge: AtomicU64, + // C2ME enqueue observability + me_c2me_send_full_total: AtomicU64, + me_c2me_send_high_water_total: AtomicU64, + me_c2me_send_timeout_total: AtomicU64, me_d2c_batch_timeout_armed_total: AtomicU64, me_d2c_batch_timeout_fired_total: AtomicU64, me_writer_pick_sorted_rr_success_try_total: AtomicU64, @@ -1414,6 +1422,38 @@ impl Stats { .store(value, Ordering::Relaxed); } } + + pub fn set_buffer_pool_gauges(&self, pooled: usize, allocated: usize, in_use: usize) { + if self.telemetry_me_allows_normal() { + self.buffer_pool_pooled_gauge + .store(pooled as u64, Ordering::Relaxed); + self.buffer_pool_allocated_gauge + .store(allocated as u64, Ordering::Relaxed); + self.buffer_pool_in_use_gauge + .store(in_use as u64, Ordering::Relaxed); + } + } + + pub fn increment_me_c2me_send_full_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_c2me_send_full_total + .fetch_add(1, Ordering::Relaxed); + } + } + + pub fn increment_me_c2me_send_high_water_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_c2me_send_high_water_total + .fetch_add(1, Ordering::Relaxed); + } + } + + pub fn increment_me_c2me_send_timeout_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_c2me_send_timeout_total + .fetch_add(1, Ordering::Relaxed); + } + } pub fn increment_me_floor_cap_block_total(&self) { if self.telemetry_me_allows_normal() { self.me_floor_cap_block_total @@ -1780,6 +1820,32 @@ impl Stats { self.me_d2c_flush_duration_us_bucket_gt_20000 .load(Ordering::Relaxed) } + + pub fn get_buffer_pool_pooled_gauge(&self) -> u64 { + self.buffer_pool_pooled_gauge.load(Ordering::Relaxed) + } + + pub fn get_buffer_pool_allocated_gauge(&self) -> u64 { + self.buffer_pool_allocated_gauge.load(Ordering::Relaxed) + } + + pub fn get_buffer_pool_in_use_gauge(&self) -> u64 { + self.buffer_pool_in_use_gauge.load(Ordering::Relaxed) + } + + pub fn get_me_c2me_send_full_total(&self) -> u64 { + self.me_c2me_send_full_total.load(Ordering::Relaxed) + } + + pub fn get_me_c2me_send_high_water_total(&self) -> u64 { + self.me_c2me_send_high_water_total + .load(Ordering::Relaxed) + } + + pub fn get_me_c2me_send_timeout_total(&self) -> u64 { + self.me_c2me_send_timeout_total + .load(Ordering::Relaxed) + } pub fn get_me_d2c_batch_timeout_armed_total(&self) -> u64 { self.me_d2c_batch_timeout_armed_total .load(Ordering::Relaxed)