diff --git a/src/metrics.rs b/src/metrics.rs index 3edf256..d59ca12 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -22,6 +22,9 @@ use crate::tls_front::cache; use crate::tls_front::fetcher; use crate::transport::{ListenOptions, create_listener}; +// Keeps `/metrics` response size bounded when per-user telemetry is enabled. +const USER_LABELED_METRICS_MAX_USERS: usize = 4096; + pub async fn serve( port: u16, listen: Option, @@ -313,6 +316,12 @@ async fn render_metrics( "telemt_telemetry_user_enabled {}", if user_enabled { 1 } else { 0 } ); + let _ = writeln!( + out, + "# HELP telemt_stats_user_entries Retained per-user stats entries" + ); + let _ = writeln!(out, "# TYPE telemt_stats_user_entries gauge"); + let _ = writeln!(out, "telemt_stats_user_entries {}", stats.user_stats_len()); let _ = writeln!( out, @@ -3071,17 +3080,6 @@ async fn render_metrics( 0 } ); - let _ = writeln!( - out, - "# HELP telemt_telemetry_user_series_suppressed User-labeled metric series suppression flag" - ); - let _ = writeln!(out, "# TYPE telemt_telemetry_user_series_suppressed gauge"); - let _ = writeln!( - out, - "telemt_telemetry_user_series_suppressed {}", - if user_enabled { 0 } else { 1 } - ); - let ip_memory = ip_tracker.memory_stats().await; let _ = writeln!( out, @@ -3154,10 +3152,20 @@ async fn render_metrics( ip_memory.recent_cap_rejects ); + let mut user_stats_emitted = 0usize; + let mut user_stats_suppressed = 0usize; + let mut unique_ip_emitted = 0usize; + let mut unique_ip_suppressed = 0usize; + if user_enabled { for entry in stats.iter_user_stats() { + if user_stats_emitted >= USER_LABELED_METRICS_MAX_USERS { + user_stats_suppressed = user_stats_suppressed.saturating_add(1); + continue; + } let user = entry.key(); let s = entry.value(); + user_stats_emitted = user_stats_emitted.saturating_add(1); let _ = writeln!( out, "telemt_user_connections_total{{user=\"{}\"}} {}", @@ -3236,6 +3244,11 @@ async fn render_metrics( let _ = writeln!(out, "# TYPE telemt_user_unique_ips_utilization gauge"); for user in unique_users { + if unique_ip_emitted >= USER_LABELED_METRICS_MAX_USERS { + unique_ip_suppressed = unique_ip_suppressed.saturating_add(1); + continue; + } + unique_ip_emitted = unique_ip_emitted.saturating_add(1); let current = ip_counts.get(&user).copied().unwrap_or(0); let limit = config .access @@ -3275,6 +3288,46 @@ async fn render_metrics( } } + let _ = writeln!( + out, + "# HELP telemt_telemetry_user_series_suppressed User-labeled metric series suppression flag" + ); + let _ = writeln!(out, "# TYPE telemt_telemetry_user_series_suppressed gauge"); + let _ = writeln!( + out, + "telemt_telemetry_user_series_suppressed {}", + if user_enabled && user_stats_suppressed == 0 && unique_ip_suppressed == 0 { + 0 + } else { + 1 + } + ); + let _ = writeln!( + out, + "# HELP telemt_telemetry_user_series_users User-labeled metric users by export status" + ); + let _ = writeln!(out, "# TYPE telemt_telemetry_user_series_users gauge"); + let _ = writeln!( + out, + "telemt_telemetry_user_series_users{{family=\"stats\",status=\"emitted\"}} {}", + user_stats_emitted + ); + let _ = writeln!( + out, + "telemt_telemetry_user_series_users{{family=\"stats\",status=\"suppressed\"}} {}", + user_stats_suppressed + ); + let _ = writeln!( + out, + "telemt_telemetry_user_series_users{{family=\"unique_ip\",status=\"emitted\"}} {}", + unique_ip_emitted + ); + let _ = writeln!( + out, + "telemt_telemetry_user_series_users{{family=\"unique_ip\",status=\"suppressed\"}} {}", + unique_ip_suppressed + ); + out } @@ -3488,6 +3541,8 @@ mod tests { assert!(output.contains("# TYPE telemt_user_unique_ips_recent_window gauge")); assert!(output.contains("# TYPE telemt_user_unique_ips_limit gauge")); assert!(output.contains("# TYPE telemt_user_unique_ips_utilization gauge")); + assert!(output.contains("# TYPE telemt_stats_user_entries gauge")); + assert!(output.contains("# TYPE telemt_telemetry_user_series_users gauge")); assert!(output.contains("# TYPE telemt_ip_tracker_users gauge")); assert!(output.contains("# TYPE telemt_ip_tracker_entries gauge")); assert!(output.contains("# TYPE telemt_ip_tracker_cleanup_queue_len gauge")); diff --git a/src/proxy/client.rs b/src/proxy/client.rs index 1ce74d6..bd15faa 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -1432,8 +1432,8 @@ impl RunningClientHandler { /// Main dispatch after successful handshake. /// Two modes: - /// - Direct: TCP relay to TG DC (existing behavior) - /// - Middle Proxy: RPC multiplex through ME pool (new — supports CDN DCs) + /// - Direct: TCP relay to TG DC (existing behavior) + /// - Middle Proxy: RPC multiplex through ME pool (supports CDN DCs) #[cfg(test)] async fn handle_authenticated_static( client_reader: CryptoReader, diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 9678f2a..cff9571 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -2477,6 +2477,11 @@ impl Stats { self.user_stats.iter() } + /// Current number of retained per-user stats entries. + pub fn user_stats_len(&self) -> usize { + self.user_stats.len() + } + pub fn uptime_secs(&self) -> f64 { self.start_time .read() diff --git a/src/tls_front/cache.rs b/src/tls_front/cache.rs index 8c2b6e5..8028d6e 100644 --- a/src/tls_front/cache.rs +++ b/src/tls_front/cache.rs @@ -1,4 +1,6 @@ use std::collections::HashMap; +use std::collections::hash_map::DefaultHasher; +use std::hash::{Hash, Hasher}; use std::net::IpAddr; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicU64, Ordering}; @@ -15,6 +17,7 @@ use crate::tls_front::types::{ const FULL_CERT_SENT_SWEEP_INTERVAL_SECS: u64 = 30; const FULL_CERT_SENT_MAX_IPS: usize = 65_536; +const FULL_CERT_SENT_SHARDS: usize = 64; static FULL_CERT_SENT_IPS_GAUGE: AtomicU64 = AtomicU64::new(0); static FULL_CERT_SENT_CAP_DROPS: AtomicU64 = AtomicU64::new(0); @@ -34,7 +37,7 @@ pub(crate) fn full_cert_sent_cap_drops_for_metrics() -> u64 { pub struct TlsFrontCache { memory: RwLock>>, default: Arc, - full_cert_sent: RwLock>, + full_cert_sent_shards: Vec>>, full_cert_sent_last_sweep_epoch_secs: AtomicU64, disk_path: PathBuf, } @@ -70,7 +73,9 @@ impl TlsFrontCache { Self { memory: RwLock::new(map), default, - full_cert_sent: RwLock::new(HashMap::new()), + full_cert_sent_shards: (0..FULL_CERT_SENT_SHARDS) + .map(|_| RwLock::new(HashMap::new())) + .collect(), full_cert_sent_last_sweep_epoch_secs: AtomicU64::new(0), disk_path: disk_path.as_ref().to_path_buf(), } @@ -88,6 +93,54 @@ impl TlsFrontCache { self.memory.read().await.contains_key(domain) } + fn full_cert_sent_shard_index(client_ip: IpAddr) -> usize { + let mut hasher = DefaultHasher::new(); + client_ip.hash(&mut hasher); + (hasher.finish() as usize) % FULL_CERT_SENT_SHARDS + } + + fn full_cert_sent_shard(&self, client_ip: IpAddr) -> &RwLock> { + &self.full_cert_sent_shards[Self::full_cert_sent_shard_index(client_ip)] + } + + fn decrement_full_cert_sent_entries(amount: usize) { + if amount == 0 { + return; + } + let amount = amount as u64; + let _ = + FULL_CERT_SENT_IPS_GAUGE.fetch_update(Ordering::AcqRel, Ordering::Relaxed, |current| { + Some(current.saturating_sub(amount)) + }); + } + + fn try_reserve_full_cert_sent_entry() -> bool { + let mut current = FULL_CERT_SENT_IPS_GAUGE.load(Ordering::Relaxed); + loop { + if current >= FULL_CERT_SENT_MAX_IPS as u64 { + return false; + } + match FULL_CERT_SENT_IPS_GAUGE.compare_exchange_weak( + current, + current.saturating_add(1), + Ordering::AcqRel, + Ordering::Relaxed, + ) { + Ok(_) => return true, + Err(actual) => current = actual, + } + } + } + + async fn sweep_full_cert_sent_shards(&self, now: Instant, ttl: Duration) { + for shard in &self.full_cert_sent_shards { + let mut guard = shard.write().await; + let before = guard.len(); + guard.retain(|_, seen_at| now.duration_since(*seen_at) < ttl); + Self::decrement_full_cert_sent_entries(before.saturating_sub(guard.len())); + } + } + /// Returns true when full cert payload should be sent for client_ip /// according to TTL policy. pub async fn take_full_cert_budget_for_ip(&self, client_ip: IpAddr, ttl: Duration) -> bool { @@ -113,11 +166,11 @@ impl TlsFrontCache { }) .is_ok(); - let mut guard = self.full_cert_sent.write().await; if should_sweep { - guard.retain(|_, seen_at| now.duration_since(*seen_at) < ttl); + self.sweep_full_cert_sent_shards(now, ttl).await; } + let mut guard = self.full_cert_sent_shard(client_ip).write().await; let allowed = match guard.get_mut(&client_ip) { Some(seen_at) => { if now.duration_since(*seen_at) >= ttl { @@ -128,19 +181,43 @@ impl TlsFrontCache { } } None => { - if guard.len() >= FULL_CERT_SENT_MAX_IPS { + if !Self::try_reserve_full_cert_sent_entry() { FULL_CERT_SENT_CAP_DROPS.fetch_add(1, Ordering::Relaxed); - FULL_CERT_SENT_IPS_GAUGE.store(guard.len() as u64, Ordering::Relaxed); return false; } guard.insert(client_ip, now); true } }; - FULL_CERT_SENT_IPS_GAUGE.store(guard.len() as u64, Ordering::Relaxed); allowed } + #[cfg(test)] + async fn insert_full_cert_sent_for_tests(&self, client_ip: IpAddr, seen_at: Instant) { + let mut guard = self.full_cert_sent_shard(client_ip).write().await; + if guard.insert(client_ip, seen_at).is_none() { + FULL_CERT_SENT_IPS_GAUGE.fetch_add(1, Ordering::Relaxed); + } + } + + #[cfg(test)] + async fn full_cert_sent_is_empty_for_tests(&self) -> bool { + for shard in &self.full_cert_sent_shards { + if !shard.read().await.is_empty() { + return false; + } + } + true + } + + #[cfg(test)] + async fn full_cert_sent_contains_for_tests(&self, client_ip: IpAddr) -> bool { + self.full_cert_sent_shard(client_ip) + .read() + .await + .contains_key(&client_ip) + } + pub async fn set(&self, domain: &str, data: CachedTlsData) { let mut guard = self.memory.write().await; guard.insert(domain.to_string(), Arc::new(data)); @@ -381,7 +458,7 @@ mod tests { assert!(cache.take_full_cert_budget_for_ip(ip, ttl).await); } - assert!(cache.full_cert_sent.read().await.is_empty()); + assert!(cache.full_cert_sent_is_empty_for_tests().await); } #[tokio::test] @@ -395,19 +472,16 @@ mod tests { .unwrap_or_else(Instant::now); cache - .full_cert_sent - .write() - .await - .insert(stale_ip, stale_seen_at); + .insert_full_cert_sent_for_tests(stale_ip, stale_seen_at) + .await; cache .full_cert_sent_last_sweep_epoch_secs .store(0, Ordering::Relaxed); assert!(cache.take_full_cert_budget_for_ip(new_ip, ttl).await); - let guard = cache.full_cert_sent.read().await; - assert!(!guard.contains_key(&stale_ip)); - assert!(guard.contains_key(&new_ip)); + assert!(!cache.full_cert_sent_contains_for_tests(stale_ip).await); + assert!(cache.full_cert_sent_contains_for_tests(new_ip).await); } #[tokio::test] @@ -425,18 +499,15 @@ mod tests { .as_secs(); cache - .full_cert_sent - .write() - .await - .insert(stale_ip, stale_seen_at); + .insert_full_cert_sent_for_tests(stale_ip, stale_seen_at) + .await; cache .full_cert_sent_last_sweep_epoch_secs .store(now_epoch_secs, Ordering::Relaxed); assert!(cache.take_full_cert_budget_for_ip(new_ip, ttl).await); - let guard = cache.full_cert_sent.read().await; - assert!(guard.contains_key(&stale_ip)); - assert!(guard.contains_key(&new_ip)); + assert!(cache.full_cert_sent_contains_for_tests(stale_ip).await); + assert!(cache.full_cert_sent_contains_for_tests(new_ip).await); } }