Add TLS front profile health metrics

This commit is contained in:
astronaut808
2026-05-03 18:06:59 +05:00
parent cdd2239047
commit 9f9ca9f270
6 changed files with 416 additions and 15 deletions

View File

@@ -18,12 +18,15 @@ use crate::ip_tracker::UserIpTracker;
use crate::proxy::shared_state::ProxySharedState;
use crate::stats::Stats;
use crate::stats::beobachten::BeobachtenStore;
use crate::tls_front::TlsFrontCache;
use crate::tls_front::cache;
use crate::tls_front::fetcher;
use crate::transport::{ListenOptions, create_listener};
// Keeps `/metrics` response size bounded when per-user telemetry is enabled.
const USER_LABELED_METRICS_MAX_USERS: usize = 4096;
// Keeps TLS-front per-domain health series bounded for large generated configs.
const TLS_FRONT_PROFILE_HEALTH_MAX_DOMAINS: usize = 256;
pub async fn serve(
port: u16,
@@ -33,6 +36,7 @@ pub async fn serve(
beobachten: Arc<BeobachtenStore>,
shared_state: Arc<ProxySharedState>,
ip_tracker: Arc<UserIpTracker>,
tls_cache: Option<Arc<TlsFrontCache>>,
config_rx: tokio::sync::watch::Receiver<Arc<ProxyConfig>>,
whitelist: Vec<IpNetwork>,
) {
@@ -57,6 +61,7 @@ pub async fn serve(
beobachten,
shared_state,
ip_tracker,
tls_cache,
config_rx,
whitelist,
)
@@ -112,6 +117,7 @@ pub async fn serve(
beobachten,
shared_state,
ip_tracker,
tls_cache,
config_rx,
whitelist,
)
@@ -122,6 +128,7 @@ pub async fn serve(
let beobachten_v6 = beobachten.clone();
let shared_state_v6 = shared_state.clone();
let ip_tracker_v6 = ip_tracker.clone();
let tls_cache_v6 = tls_cache.clone();
let config_rx_v6 = config_rx.clone();
let whitelist_v6 = whitelist.clone();
tokio::spawn(async move {
@@ -131,6 +138,7 @@ pub async fn serve(
beobachten_v6,
shared_state_v6,
ip_tracker_v6,
tls_cache_v6,
config_rx_v6,
whitelist_v6,
)
@@ -142,6 +150,7 @@ pub async fn serve(
beobachten,
shared_state,
ip_tracker,
tls_cache,
config_rx,
whitelist,
)
@@ -171,6 +180,7 @@ async fn serve_listener(
beobachten: Arc<BeobachtenStore>,
shared_state: Arc<ProxySharedState>,
ip_tracker: Arc<UserIpTracker>,
tls_cache: Option<Arc<TlsFrontCache>>,
config_rx: tokio::sync::watch::Receiver<Arc<ProxyConfig>>,
whitelist: Arc<Vec<IpNetwork>>,
) {
@@ -192,6 +202,7 @@ async fn serve_listener(
let beobachten = beobachten.clone();
let shared_state = shared_state.clone();
let ip_tracker = ip_tracker.clone();
let tls_cache = tls_cache.clone();
let config_rx_conn = config_rx.clone();
tokio::spawn(async move {
let svc = service_fn(move |req| {
@@ -199,6 +210,7 @@ async fn serve_listener(
let beobachten = beobachten.clone();
let shared_state = shared_state.clone();
let ip_tracker = ip_tracker.clone();
let tls_cache = tls_cache.clone();
let config = config_rx_conn.borrow().clone();
async move {
handle(
@@ -207,6 +219,7 @@ async fn serve_listener(
&beobachten,
&shared_state,
&ip_tracker,
tls_cache.as_deref(),
&config,
)
.await
@@ -228,10 +241,11 @@ async fn handle<B>(
beobachten: &BeobachtenStore,
shared_state: &ProxySharedState,
ip_tracker: &UserIpTracker,
tls_cache: Option<&TlsFrontCache>,
config: &ProxyConfig,
) -> Result<Response<Full<Bytes>>, Infallible> {
if req.uri().path() == "/metrics" {
let body = render_metrics(stats, shared_state, config, ip_tracker).await;
let body = render_metrics(stats, shared_state, config, ip_tracker, tls_cache).await;
let resp = Response::builder()
.status(StatusCode::OK)
.header("content-type", "text/plain; version=0.0.4; charset=utf-8")
@@ -266,11 +280,151 @@ fn render_beobachten(beobachten: &BeobachtenStore, config: &ProxyConfig) -> Stri
beobachten.snapshot_text(ttl)
}
fn tls_front_domains(config: &ProxyConfig) -> Vec<String> {
let mut domains = Vec::with_capacity(1 + config.censorship.tls_domains.len());
if !config.censorship.tls_domain.is_empty() {
domains.push(config.censorship.tls_domain.clone());
}
for domain in &config.censorship.tls_domains {
if !domain.is_empty() && !domains.contains(domain) {
domains.push(domain.clone());
}
}
domains
}
fn prometheus_label_value(value: &str) -> String {
value.replace('\\', "\\\\").replace('"', "\\\"")
}
async fn render_tls_front_profile_health(
out: &mut String,
config: &ProxyConfig,
tls_cache: Option<&TlsFrontCache>,
) {
use std::fmt::Write;
let domains = tls_front_domains(config);
let (health, suppressed) = match (config.censorship.tls_emulation, tls_cache) {
(true, Some(cache)) => {
cache
.profile_health_snapshot(&domains, TLS_FRONT_PROFILE_HEALTH_MAX_DOMAINS)
.await
}
_ => (Vec::new(), domains.len()),
};
let _ = writeln!(
out,
"# HELP telemt_tls_front_profile_domains TLS front configured profile domains by export status"
);
let _ = writeln!(out, "# TYPE telemt_tls_front_profile_domains gauge");
let _ = writeln!(
out,
"telemt_tls_front_profile_domains{{status=\"configured\"}} {}",
domains.len()
);
let _ = writeln!(
out,
"telemt_tls_front_profile_domains{{status=\"emitted\"}} {}",
health.len()
);
let _ = writeln!(
out,
"telemt_tls_front_profile_domains{{status=\"suppressed\"}} {}",
suppressed
);
let _ = writeln!(
out,
"# HELP telemt_tls_front_profile_info TLS front profile source and feature flags per configured domain"
);
let _ = writeln!(out, "# TYPE telemt_tls_front_profile_info gauge");
let _ = writeln!(
out,
"# HELP telemt_tls_front_profile_age_seconds Age of cached TLS front profile data per configured domain"
);
let _ = writeln!(
out,
"# TYPE telemt_tls_front_profile_age_seconds gauge"
);
let _ = writeln!(
out,
"# HELP telemt_tls_front_profile_app_data_records TLS front cached app-data record count per configured domain"
);
let _ = writeln!(
out,
"# TYPE telemt_tls_front_profile_app_data_records gauge"
);
let _ = writeln!(
out,
"# HELP telemt_tls_front_profile_ticket_records TLS front cached ticket-like tail record count per configured domain"
);
let _ = writeln!(
out,
"# TYPE telemt_tls_front_profile_ticket_records gauge"
);
let _ = writeln!(
out,
"# HELP telemt_tls_front_profile_change_cipher_spec_records TLS front cached ChangeCipherSpec record count per configured domain"
);
let _ = writeln!(
out,
"# TYPE telemt_tls_front_profile_change_cipher_spec_records gauge"
);
let _ = writeln!(
out,
"# HELP telemt_tls_front_profile_app_data_bytes TLS front cached total app-data bytes per configured domain"
);
let _ = writeln!(
out,
"# TYPE telemt_tls_front_profile_app_data_bytes gauge"
);
for item in health {
let domain = prometheus_label_value(&item.domain);
let _ = writeln!(
out,
"telemt_tls_front_profile_info{{domain=\"{}\",source=\"{}\",is_default=\"{}\",has_cert_info=\"{}\",has_cert_payload=\"{}\"}} 1",
domain,
item.source,
item.is_default,
item.has_cert_info,
item.has_cert_payload
);
let _ = writeln!(
out,
"telemt_tls_front_profile_age_seconds{{domain=\"{}\"}} {}",
domain, item.age_seconds
);
let _ = writeln!(
out,
"telemt_tls_front_profile_app_data_records{{domain=\"{}\"}} {}",
domain, item.app_data_records
);
let _ = writeln!(
out,
"telemt_tls_front_profile_ticket_records{{domain=\"{}\"}} {}",
domain, item.ticket_records
);
let _ = writeln!(
out,
"telemt_tls_front_profile_change_cipher_spec_records{{domain=\"{}\"}} {}",
domain, item.change_cipher_spec_count
);
let _ = writeln!(
out,
"telemt_tls_front_profile_app_data_bytes{{domain=\"{}\"}} {}",
domain, item.total_app_data_len
);
}
}
async fn render_metrics(
stats: &Stats,
shared_state: &ProxySharedState,
config: &ProxyConfig,
ip_tracker: &UserIpTracker,
tls_cache: Option<&TlsFrontCache>,
) -> String {
use std::fmt::Write;
let mut out = String::with_capacity(4096);
@@ -423,6 +577,7 @@ async fn render_metrics(
"telemt_tls_front_full_cert_budget_cap_drops_total {}",
cache::full_cert_sent_cap_drops_for_metrics()
);
render_tls_front_profile_health(&mut out, config, tls_cache).await;
let _ = writeln!(
out,
@@ -3361,6 +3516,11 @@ mod tests {
use super::*;
use http_body_util::BodyExt;
use std::net::IpAddr;
use std::time::SystemTime;
use crate::tls_front::types::{
CachedTlsData, ParsedServerHello, TlsBehaviorProfile, TlsCertPayload, TlsProfileSource,
};
#[tokio::test]
async fn test_render_metrics_format() {
@@ -3429,7 +3589,7 @@ mod tests {
.await
.unwrap();
let output = render_metrics(&stats, shared_state.as_ref(), &config, &tracker).await;
let output = render_metrics(&stats, shared_state.as_ref(), &config, &tracker, None).await;
assert!(output.contains(&format!(
"telemt_build_info{{version=\"{}\"}} 1",
@@ -3494,13 +3654,86 @@ mod tests {
assert!(output.contains("telemt_ip_tracker_cleanup_queue_len 0"));
}
#[tokio::test]
async fn test_render_tls_front_profile_health() {
let stats = Stats::new();
let shared_state = ProxySharedState::new();
let tracker = UserIpTracker::new();
let mut config = ProxyConfig::default();
config.censorship.tls_domain = "primary.example".to_string();
config.censorship.tls_domains = vec!["fallback.example".to_string()];
let cache = TlsFrontCache::new(
&[
"primary.example".to_string(),
"fallback.example".to_string(),
],
1024,
"tlsfront-profile-health-test",
);
cache
.set(
"primary.example",
CachedTlsData {
server_hello_template: ParsedServerHello {
version: [0x03, 0x03],
random: [0u8; 32],
session_id: Vec::new(),
cipher_suite: [0x13, 0x01],
compression: 0,
extensions: Vec::new(),
},
cert_info: None,
cert_payload: Some(TlsCertPayload {
cert_chain_der: vec![vec![0x30, 0x01]],
certificate_message: vec![0x0b, 0x00, 0x00, 0x00],
}),
app_data_records_sizes: vec![1024, 512],
total_app_data_len: 1536,
behavior_profile: TlsBehaviorProfile {
change_cipher_spec_count: 1,
app_data_record_sizes: vec![1024, 512],
ticket_record_sizes: vec![69],
source: TlsProfileSource::Merged,
},
fetched_at: SystemTime::now(),
domain: "primary.example".to_string(),
},
)
.await;
let output = render_metrics(&stats, &shared_state, &config, &tracker, Some(&cache)).await;
assert!(output.contains("telemt_tls_front_profile_domains{status=\"configured\"} 2"));
assert!(output.contains("telemt_tls_front_profile_domains{status=\"emitted\"} 2"));
assert!(output.contains("telemt_tls_front_profile_domains{status=\"suppressed\"} 0"));
assert!(
output.contains("telemt_tls_front_profile_info{domain=\"primary.example\",source=\"merged\",is_default=\"false\",has_cert_info=\"false\",has_cert_payload=\"true\"} 1")
);
assert!(
output.contains("telemt_tls_front_profile_info{domain=\"fallback.example\",source=\"default\",is_default=\"true\",has_cert_info=\"false\",has_cert_payload=\"false\"} 1")
);
assert!(
output.contains("telemt_tls_front_profile_app_data_records{domain=\"primary.example\"} 2")
);
assert!(
output.contains("telemt_tls_front_profile_ticket_records{domain=\"primary.example\"} 1")
);
assert!(
output.contains("telemt_tls_front_profile_change_cipher_spec_records{domain=\"primary.example\"} 1")
);
assert!(
output.contains("telemt_tls_front_profile_app_data_bytes{domain=\"primary.example\"} 1536")
);
}
#[tokio::test]
async fn test_render_empty_stats() {
let stats = Stats::new();
let shared_state = ProxySharedState::new();
let tracker = UserIpTracker::new();
let config = ProxyConfig::default();
let output = render_metrics(&stats, &shared_state, &config, &tracker).await;
let output = render_metrics(&stats, &shared_state, &config, &tracker, None).await;
assert!(output.contains("telemt_connections_total 0"));
assert!(output.contains("telemt_connections_bad_total 0"));
assert!(output.contains("telemt_handshake_timeouts_total 0"));
@@ -3524,7 +3757,7 @@ mod tests {
let mut config = ProxyConfig::default();
config.access.user_max_unique_ips_global_each = 2;
let output = render_metrics(&stats, &shared_state, &config, &tracker).await;
let output = render_metrics(&stats, &shared_state, &config, &tracker, None).await;
assert!(output.contains("telemt_user_unique_ips_limit{user=\"alice\"} 2"));
assert!(output.contains("telemt_user_unique_ips_utilization{user=\"alice\"} 0.500000"));
@@ -3536,7 +3769,7 @@ mod tests {
let shared_state = ProxySharedState::new();
let tracker = UserIpTracker::new();
let config = ProxyConfig::default();
let output = render_metrics(&stats, &shared_state, &config, &tracker).await;
let output = render_metrics(&stats, &shared_state, &config, &tracker, None).await;
assert!(output.contains("# TYPE telemt_uptime_seconds gauge"));
assert!(output.contains("# TYPE telemt_connections_total counter"));
assert!(output.contains("# TYPE telemt_connections_bad_total counter"));
@@ -3585,6 +3818,15 @@ mod tests {
assert!(
output.contains("# TYPE telemt_tls_front_full_cert_budget_cap_drops_total counter")
);
assert!(output.contains("# TYPE telemt_tls_front_profile_domains gauge"));
assert!(output.contains("# TYPE telemt_tls_front_profile_info gauge"));
assert!(output.contains("# TYPE telemt_tls_front_profile_age_seconds gauge"));
assert!(output.contains("# TYPE telemt_tls_front_profile_app_data_records gauge"));
assert!(output.contains("# TYPE telemt_tls_front_profile_ticket_records gauge"));
assert!(
output.contains("# TYPE telemt_tls_front_profile_change_cipher_spec_records gauge")
);
assert!(output.contains("# TYPE telemt_tls_front_profile_app_data_bytes gauge"));
}
#[tokio::test]
@@ -3605,6 +3847,7 @@ mod tests {
&beobachten,
shared_state.as_ref(),
&tracker,
None,
&config,
)
.await
@@ -3639,6 +3882,7 @@ mod tests {
&beobachten,
shared_state.as_ref(),
&tracker,
None,
&config,
)
.await
@@ -3656,6 +3900,7 @@ mod tests {
&beobachten,
shared_state.as_ref(),
&tracker,
None,
&config,
)
.await