Merge branch 'flow' into feature/metrics-build-info

This commit is contained in:
Alexey
2026-04-07 18:34:51 +03:00
committed by GitHub
77 changed files with 12010 additions and 2166 deletions

View File

@@ -15,6 +15,7 @@ use tracing::{debug, info, warn};
use crate::config::ProxyConfig;
use crate::ip_tracker::UserIpTracker;
use crate::proxy::shared_state::ProxySharedState;
use crate::stats::Stats;
use crate::stats::beobachten::BeobachtenStore;
use crate::transport::{ListenOptions, create_listener};
@@ -25,6 +26,7 @@ pub async fn serve(
listen_backlog: u32,
stats: Arc<Stats>,
beobachten: Arc<BeobachtenStore>,
shared_state: Arc<ProxySharedState>,
ip_tracker: Arc<UserIpTracker>,
config_rx: tokio::sync::watch::Receiver<Arc<ProxyConfig>>,
whitelist: Vec<IpNetwork>,
@@ -45,7 +47,13 @@ pub async fn serve(
Ok(listener) => {
info!("Metrics endpoint: http://{}/metrics and /beobachten", addr);
serve_listener(
listener, stats, beobachten, ip_tracker, config_rx, whitelist,
listener,
stats,
beobachten,
shared_state,
ip_tracker,
config_rx,
whitelist,
)
.await;
}
@@ -94,13 +102,20 @@ pub async fn serve(
}
(Some(listener), None) | (None, Some(listener)) => {
serve_listener(
listener, stats, beobachten, ip_tracker, config_rx, whitelist,
listener,
stats,
beobachten,
shared_state,
ip_tracker,
config_rx,
whitelist,
)
.await;
}
(Some(listener4), Some(listener6)) => {
let stats_v6 = stats.clone();
let beobachten_v6 = beobachten.clone();
let shared_state_v6 = shared_state.clone();
let ip_tracker_v6 = ip_tracker.clone();
let config_rx_v6 = config_rx.clone();
let whitelist_v6 = whitelist.clone();
@@ -109,6 +124,7 @@ pub async fn serve(
listener6,
stats_v6,
beobachten_v6,
shared_state_v6,
ip_tracker_v6,
config_rx_v6,
whitelist_v6,
@@ -116,7 +132,13 @@ pub async fn serve(
.await;
});
serve_listener(
listener4, stats, beobachten, ip_tracker, config_rx, whitelist,
listener4,
stats,
beobachten,
shared_state,
ip_tracker,
config_rx,
whitelist,
)
.await;
}
@@ -142,6 +164,7 @@ async fn serve_listener(
listener: TcpListener,
stats: Arc<Stats>,
beobachten: Arc<BeobachtenStore>,
shared_state: Arc<ProxySharedState>,
ip_tracker: Arc<UserIpTracker>,
config_rx: tokio::sync::watch::Receiver<Arc<ProxyConfig>>,
whitelist: Arc<Vec<IpNetwork>>,
@@ -162,15 +185,19 @@ async fn serve_listener(
let stats = stats.clone();
let beobachten = beobachten.clone();
let shared_state = shared_state.clone();
let ip_tracker = ip_tracker.clone();
let config_rx_conn = config_rx.clone();
tokio::spawn(async move {
let svc = service_fn(move |req| {
let stats = stats.clone();
let beobachten = beobachten.clone();
let shared_state = shared_state.clone();
let ip_tracker = ip_tracker.clone();
let config = config_rx_conn.borrow().clone();
async move { handle(req, &stats, &beobachten, &ip_tracker, &config).await }
async move {
handle(req, &stats, &beobachten, &shared_state, &ip_tracker, &config).await
}
});
if let Err(e) = http1::Builder::new()
.serve_connection(hyper_util::rt::TokioIo::new(stream), svc)
@@ -186,11 +213,12 @@ async fn handle<B>(
req: Request<B>,
stats: &Stats,
beobachten: &BeobachtenStore,
shared_state: &ProxySharedState,
ip_tracker: &UserIpTracker,
config: &ProxyConfig,
) -> Result<Response<Full<Bytes>>, Infallible> {
if req.uri().path() == "/metrics" {
let body = render_metrics(stats, config, ip_tracker).await;
let body = render_metrics(stats, shared_state, config, ip_tracker).await;
let resp = Response::builder()
.status(StatusCode::OK)
.header("content-type", "text/plain; version=0.0.4; charset=utf-8")
@@ -225,7 +253,12 @@ fn render_beobachten(beobachten: &BeobachtenStore, config: &ProxyConfig) -> Stri
beobachten.snapshot_text(ttl)
}
async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIpTracker) -> String {
async fn render_metrics(
stats: &Stats,
shared_state: &ProxySharedState,
config: &ProxyConfig,
ip_tracker: &UserIpTracker,
) -> String {
use std::fmt::Write;
let mut out = String::with_capacity(4096);
let telemetry = stats.telemetry_policy();
@@ -304,6 +337,27 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
}
);
let _ = writeln!(
out,
"# HELP telemt_buffer_pool_buffers_total Snapshot of pooled and allocated buffers"
);
let _ = writeln!(out, "# TYPE telemt_buffer_pool_buffers_total gauge");
let _ = writeln!(
out,
"telemt_buffer_pool_buffers_total{{kind=\"pooled\"}} {}",
stats.get_buffer_pool_pooled_gauge()
);
let _ = writeln!(
out,
"telemt_buffer_pool_buffers_total{{kind=\"allocated\"}} {}",
stats.get_buffer_pool_allocated_gauge()
);
let _ = writeln!(
out,
"telemt_buffer_pool_buffers_total{{kind=\"in_use\"}} {}",
stats.get_buffer_pool_in_use_gauge()
);
let _ = writeln!(
out,
"# HELP telemt_connections_total Total accepted connections"
@@ -349,6 +403,170 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
}
);
let _ = writeln!(
out,
"# HELP telemt_auth_expensive_checks_total Expensive authentication candidate checks executed during handshake validation"
);
let _ = writeln!(out, "# TYPE telemt_auth_expensive_checks_total counter");
let _ = writeln!(
out,
"telemt_auth_expensive_checks_total {}",
if core_enabled {
shared_state
.handshake
.auth_expensive_checks_total
.load(std::sync::atomic::Ordering::Relaxed)
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_auth_budget_exhausted_total Handshake validations that hit authentication candidate budget limits"
);
let _ = writeln!(out, "# TYPE telemt_auth_budget_exhausted_total counter");
let _ = writeln!(
out,
"telemt_auth_budget_exhausted_total {}",
if core_enabled {
shared_state
.handshake
.auth_budget_exhausted_total
.load(std::sync::atomic::Ordering::Relaxed)
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_accept_permit_timeout_total Accepted connections dropped due to permit wait timeout"
);
let _ = writeln!(out, "# TYPE telemt_accept_permit_timeout_total counter");
let _ = writeln!(
out,
"telemt_accept_permit_timeout_total {}",
if core_enabled {
stats.get_accept_permit_timeout_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_conntrack_control_state Runtime conntrack control state flags"
);
let _ = writeln!(out, "# TYPE telemt_conntrack_control_state gauge");
let _ = writeln!(
out,
"telemt_conntrack_control_state{{flag=\"enabled\"}} {}",
if stats.get_conntrack_control_enabled() {
1
} else {
0
}
);
let _ = writeln!(
out,
"telemt_conntrack_control_state{{flag=\"available\"}} {}",
if stats.get_conntrack_control_available() {
1
} else {
0
}
);
let _ = writeln!(
out,
"telemt_conntrack_control_state{{flag=\"pressure_active\"}} {}",
if stats.get_conntrack_pressure_active() {
1
} else {
0
}
);
let _ = writeln!(
out,
"telemt_conntrack_control_state{{flag=\"rule_apply_ok\"}} {}",
if stats.get_conntrack_rule_apply_ok() {
1
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_conntrack_event_queue_depth Pending close events in conntrack control queue"
);
let _ = writeln!(out, "# TYPE telemt_conntrack_event_queue_depth gauge");
let _ = writeln!(
out,
"telemt_conntrack_event_queue_depth {}",
stats.get_conntrack_event_queue_depth()
);
let _ = writeln!(
out,
"# HELP telemt_conntrack_delete_total Conntrack delete attempts by outcome"
);
let _ = writeln!(out, "# TYPE telemt_conntrack_delete_total counter");
let _ = writeln!(
out,
"telemt_conntrack_delete_total{{result=\"attempt\"}} {}",
if core_enabled {
stats.get_conntrack_delete_attempt_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_conntrack_delete_total{{result=\"success\"}} {}",
if core_enabled {
stats.get_conntrack_delete_success_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_conntrack_delete_total{{result=\"not_found\"}} {}",
if core_enabled {
stats.get_conntrack_delete_not_found_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_conntrack_delete_total{{result=\"error\"}} {}",
if core_enabled {
stats.get_conntrack_delete_error_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_conntrack_close_event_drop_total Dropped conntrack close events due to queue pressure or unavailable sender"
);
let _ = writeln!(
out,
"# TYPE telemt_conntrack_close_event_drop_total counter"
);
let _ = writeln!(
out,
"telemt_conntrack_close_event_drop_total {}",
if core_enabled {
stats.get_conntrack_close_event_drop_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_upstream_connect_attempt_total Upstream connect attempts across all requests"
@@ -952,6 +1170,39 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
}
);
let _ = writeln!(
out,
"# HELP telemt_me_c2me_enqueue_events_total ME client->ME enqueue outcomes"
);
let _ = writeln!(out, "# TYPE telemt_me_c2me_enqueue_events_total counter");
let _ = writeln!(
out,
"telemt_me_c2me_enqueue_events_total{{event=\"full\"}} {}",
if me_allows_normal {
stats.get_me_c2me_send_full_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_c2me_enqueue_events_total{{event=\"high_water\"}} {}",
if me_allows_normal {
stats.get_me_c2me_send_high_water_total()
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_c2me_enqueue_events_total{{event=\"timeout\"}} {}",
if me_allows_normal {
stats.get_me_c2me_send_timeout_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_d2c_batches_total Total DC->Client flush batches"
@@ -2501,6 +2752,48 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
if user_enabled { 0 } else { 1 }
);
let ip_memory = ip_tracker.memory_stats().await;
let _ = writeln!(
out,
"# HELP telemt_ip_tracker_users Number of users tracked by IP limiter state"
);
let _ = writeln!(out, "# TYPE telemt_ip_tracker_users gauge");
let _ = writeln!(
out,
"telemt_ip_tracker_users{{scope=\"active\"}} {}",
ip_memory.active_users
);
let _ = writeln!(
out,
"telemt_ip_tracker_users{{scope=\"recent\"}} {}",
ip_memory.recent_users
);
let _ = writeln!(
out,
"# HELP telemt_ip_tracker_entries Number of IP entries tracked by limiter state"
);
let _ = writeln!(out, "# TYPE telemt_ip_tracker_entries gauge");
let _ = writeln!(
out,
"telemt_ip_tracker_entries{{scope=\"active\"}} {}",
ip_memory.active_entries
);
let _ = writeln!(
out,
"telemt_ip_tracker_entries{{scope=\"recent\"}} {}",
ip_memory.recent_entries
);
let _ = writeln!(
out,
"# HELP telemt_ip_tracker_cleanup_queue_len Deferred disconnect cleanup queue length"
);
let _ = writeln!(out, "# TYPE telemt_ip_tracker_cleanup_queue_len gauge");
let _ = writeln!(
out,
"telemt_ip_tracker_cleanup_queue_len {}",
ip_memory.cleanup_queue_len
);
if user_enabled {
for entry in stats.iter_user_stats() {
let user = entry.key();
@@ -2634,6 +2927,7 @@ mod tests {
#[tokio::test]
async fn test_render_metrics_format() {
let stats = Arc::new(Stats::new());
let shared_state = ProxySharedState::new();
let tracker = UserIpTracker::new();
let mut config = ProxyConfig::default();
config
@@ -2645,6 +2939,14 @@ mod tests {
stats.increment_connects_all();
stats.increment_connects_bad();
stats.increment_handshake_timeouts();
shared_state
.handshake
.auth_expensive_checks_total
.fetch_add(9, std::sync::atomic::Ordering::Relaxed);
shared_state
.handshake
.auth_budget_exhausted_total
.fetch_add(2, std::sync::atomic::Ordering::Relaxed);
stats.increment_upstream_connect_attempt_total();
stats.increment_upstream_connect_attempt_total();
stats.increment_upstream_connect_success_total();
@@ -2688,7 +2990,7 @@ mod tests {
.await
.unwrap();
let output = render_metrics(&stats, &config, &tracker).await;
let output = render_metrics(&stats, shared_state.as_ref(), &config, &tracker).await;
assert!(output.contains(&format!(
"telemt_build_info{{version=\"{}\"}} 1",
@@ -2697,6 +2999,8 @@ mod tests {
assert!(output.contains("telemt_connections_total 2"));
assert!(output.contains("telemt_connections_bad_total 1"));
assert!(output.contains("telemt_handshake_timeouts_total 1"));
assert!(output.contains("telemt_auth_expensive_checks_total 9"));
assert!(output.contains("telemt_auth_budget_exhausted_total 2"));
assert!(output.contains("telemt_upstream_connect_attempt_total 2"));
assert!(output.contains("telemt_upstream_connect_success_total 1"));
assert!(output.contains("telemt_upstream_connect_fail_total 1"));
@@ -2743,17 +3047,23 @@ mod tests {
assert!(output.contains("telemt_user_unique_ips_recent_window{user=\"alice\"} 1"));
assert!(output.contains("telemt_user_unique_ips_limit{user=\"alice\"} 4"));
assert!(output.contains("telemt_user_unique_ips_utilization{user=\"alice\"} 0.250000"));
assert!(output.contains("telemt_ip_tracker_users{scope=\"active\"} 1"));
assert!(output.contains("telemt_ip_tracker_entries{scope=\"active\"} 1"));
assert!(output.contains("telemt_ip_tracker_cleanup_queue_len 0"));
}
#[tokio::test]
async fn test_render_empty_stats() {
let stats = Stats::new();
let shared_state = ProxySharedState::new();
let tracker = UserIpTracker::new();
let config = ProxyConfig::default();
let output = render_metrics(&stats, &config, &tracker).await;
let output = render_metrics(&stats, &shared_state, &config, &tracker).await;
assert!(output.contains("telemt_connections_total 0"));
assert!(output.contains("telemt_connections_bad_total 0"));
assert!(output.contains("telemt_handshake_timeouts_total 0"));
assert!(output.contains("telemt_auth_expensive_checks_total 0"));
assert!(output.contains("telemt_auth_budget_exhausted_total 0"));
assert!(output.contains("telemt_user_unique_ips_current{user="));
assert!(output.contains("telemt_user_unique_ips_recent_window{user="));
}
@@ -2761,6 +3071,7 @@ mod tests {
#[tokio::test]
async fn test_render_uses_global_each_unique_ip_limit() {
let stats = Stats::new();
let shared_state = ProxySharedState::new();
stats.increment_user_connects("alice");
stats.increment_user_curr_connects("alice");
let tracker = UserIpTracker::new();
@@ -2771,7 +3082,7 @@ mod tests {
let mut config = ProxyConfig::default();
config.access.user_max_unique_ips_global_each = 2;
let output = render_metrics(&stats, &config, &tracker).await;
let output = render_metrics(&stats, &shared_state, &config, &tracker).await;
assert!(output.contains("telemt_user_unique_ips_limit{user=\"alice\"} 2"));
assert!(output.contains("telemt_user_unique_ips_utilization{user=\"alice\"} 0.500000"));
@@ -2780,14 +3091,16 @@ mod tests {
#[tokio::test]
async fn test_render_has_type_annotations() {
let stats = Stats::new();
let shared_state = ProxySharedState::new();
let tracker = UserIpTracker::new();
let config = ProxyConfig::default();
let output = render_metrics(&stats, &config, &tracker).await;
assert!(output.contains("# TYPE telemt_build_info gauge"));
let output = render_metrics(&stats, &shared_state, &config, &tracker).await;
assert!(output.contains("# TYPE telemt_uptime_seconds gauge"));
assert!(output.contains("# TYPE telemt_connections_total counter"));
assert!(output.contains("# TYPE telemt_connections_bad_total counter"));
assert!(output.contains("# TYPE telemt_handshake_timeouts_total counter"));
assert!(output.contains("# TYPE telemt_auth_expensive_checks_total counter"));
assert!(output.contains("# TYPE telemt_auth_budget_exhausted_total counter"));
assert!(output.contains("# TYPE telemt_upstream_connect_attempt_total counter"));
assert!(output.contains("# TYPE telemt_me_rpc_proxy_req_signal_sent_total counter"));
assert!(output.contains("# TYPE telemt_me_idle_close_by_peer_total counter"));
@@ -2815,12 +3128,16 @@ mod tests {
assert!(output.contains("# TYPE telemt_user_unique_ips_recent_window gauge"));
assert!(output.contains("# TYPE telemt_user_unique_ips_limit gauge"));
assert!(output.contains("# TYPE telemt_user_unique_ips_utilization gauge"));
assert!(output.contains("# TYPE telemt_ip_tracker_users gauge"));
assert!(output.contains("# TYPE telemt_ip_tracker_entries gauge"));
assert!(output.contains("# TYPE telemt_ip_tracker_cleanup_queue_len gauge"));
}
#[tokio::test]
async fn test_endpoint_integration() {
let stats = Arc::new(Stats::new());
let beobachten = Arc::new(BeobachtenStore::new());
let shared_state = ProxySharedState::new();
let tracker = UserIpTracker::new();
let mut config = ProxyConfig::default();
stats.increment_connects_all();
@@ -2828,7 +3145,7 @@ mod tests {
stats.increment_connects_all();
let req = Request::builder().uri("/metrics").body(()).unwrap();
let resp = handle(req, &stats, &beobachten, &tracker, &config)
let resp = handle(req, &stats, &beobachten, shared_state.as_ref(), &tracker, &config)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
@@ -2855,7 +3172,14 @@ mod tests {
Duration::from_secs(600),
);
let req_beob = Request::builder().uri("/beobachten").body(()).unwrap();
let resp_beob = handle(req_beob, &stats, &beobachten, &tracker, &config)
let resp_beob = handle(
req_beob,
&stats,
&beobachten,
shared_state.as_ref(),
&tracker,
&config,
)
.await
.unwrap();
assert_eq!(resp_beob.status(), StatusCode::OK);
@@ -2865,7 +3189,14 @@ mod tests {
assert!(beob_text.contains("203.0.113.10-1"));
let req404 = Request::builder().uri("/other").body(()).unwrap();
let resp404 = handle(req404, &stats, &beobachten, &tracker, &config)
let resp404 = handle(
req404,
&stats,
&beobachten,
shared_state.as_ref(),
&tracker,
&config,
)
.await
.unwrap();
assert_eq!(resp404.status(), StatusCode::NOT_FOUND);