diff --git a/src/config/load.rs b/src/config/load.rs index 1e455b8..20faf20 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -1087,9 +1087,9 @@ impl ProxyConfig { )); } - if config.general.me_route_blocking_send_timeout_ms > 5000 { + if !(1..=5000).contains(&config.general.me_route_blocking_send_timeout_ms) { return Err(ProxyError::Config( - "general.me_route_blocking_send_timeout_ms must be within [0, 5000]".to_string(), + "general.me_route_blocking_send_timeout_ms must be within [1, 5000]".to_string(), )); } @@ -2602,6 +2602,26 @@ mod tests { let _ = std::fs::remove_file(path); } + #[test] + fn me_route_blocking_send_timeout_ms_zero_is_rejected() { + let toml = r#" + [general] + me_route_blocking_send_timeout_ms = 0 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_me_route_blocking_send_timeout_zero_test.toml"); + std::fs::write(&path, toml).unwrap(); + let err = ProxyConfig::load(&path).unwrap_err().to_string(); + assert!(err.contains("general.me_route_blocking_send_timeout_ms must be within [1, 5000]")); + let _ = std::fs::remove_file(path); + } + #[test] fn me_route_no_writer_mode_is_parsed() { let toml = r#" diff --git a/src/config/types.rs b/src/config/types.rs index b1260c7..4762083 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -778,7 +778,7 @@ pub struct GeneralConfig { pub me_route_hybrid_max_wait_ms: u64, /// Maximum wait in milliseconds for blocking ME writer channel send fallback. - /// `0` keeps legacy unbounded wait behavior. + /// Must be within [1, 5000]. #[serde(default = "default_me_route_blocking_send_timeout_ms")] pub me_route_blocking_send_timeout_ms: u64, diff --git a/src/ip_tracker.rs b/src/ip_tracker.rs index de87aa7..e7b5185 100644 --- a/src/ip_tracker.rs +++ b/src/ip_tracker.rs @@ -9,14 +9,24 @@ use std::sync::Mutex; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{Duration, Instant}; -use tokio::sync::{Mutex as AsyncMutex, RwLock}; +use tokio::sync::{Mutex as AsyncMutex, RwLock, RwLockWriteGuard}; use crate::config::UserMaxUniqueIpsMode; +const CLEANUP_DRAIN_BATCH_LIMIT: usize = 1024; +const MAX_ACTIVE_IP_ENTRIES: u64 = 131_072; +const MAX_RECENT_IP_ENTRIES: u64 = 262_144; + +/// Tracks active and recent client IPs for per-user admission control. #[derive(Debug, Clone)] pub struct UserIpTracker { active_ips: Arc>>>, recent_ips: Arc>>>, + active_entry_count: Arc, + recent_entry_count: Arc, + active_cap_rejects: Arc, + recent_cap_rejects: Arc, + cleanup_deferred_releases: Arc, max_ips: Arc>>, default_max_ips: Arc>, limit_mode: Arc>, @@ -26,13 +36,25 @@ pub struct UserIpTracker { cleanup_drain_lock: Arc>, } +/// Point-in-time memory counters for user/IP limiter state. #[derive(Debug, Clone, Copy)] pub struct UserIpTrackerMemoryStats { + /// Number of users with active IP state. pub active_users: usize, + /// Number of users with recent IP state. pub recent_users: usize, + /// Number of active `(user, ip)` entries. pub active_entries: usize, + /// Number of recent-window `(user, ip)` entries. pub recent_entries: usize, + /// Number of deferred disconnect cleanups waiting to be drained. pub cleanup_queue_len: usize, + /// Number of new connections rejected by the global active-entry cap. + pub active_cap_rejects: u64, + /// Number of new connections rejected by the global recent-entry cap. + pub recent_cap_rejects: u64, + /// Number of release cleanups deferred through the cleanup queue. + pub cleanup_deferred_releases: u64, } impl UserIpTracker { @@ -40,6 +62,11 @@ impl UserIpTracker { Self { active_ips: Arc::new(RwLock::new(HashMap::new())), recent_ips: Arc::new(RwLock::new(HashMap::new())), + active_entry_count: Arc::new(AtomicU64::new(0)), + recent_entry_count: Arc::new(AtomicU64::new(0)), + active_cap_rejects: Arc::new(AtomicU64::new(0)), + recent_cap_rejects: Arc::new(AtomicU64::new(0)), + cleanup_deferred_releases: Arc::new(AtomicU64::new(0)), max_ips: Arc::new(RwLock::new(HashMap::new())), default_max_ips: Arc::new(RwLock::new(0)), limit_mode: Arc::new(RwLock::new(UserMaxUniqueIpsMode::ActiveWindow)), @@ -50,16 +77,59 @@ impl UserIpTracker { } } + fn decrement_counter(counter: &AtomicU64, amount: usize) { + if amount == 0 { + return; + } + let amount = amount as u64; + let _ = counter.fetch_update(Ordering::AcqRel, Ordering::Relaxed, |current| { + Some(current.saturating_sub(amount)) + }); + } + + fn apply_active_cleanup( + active_ips: &mut HashMap>, + user: &str, + ip: IpAddr, + pending_count: usize, + ) -> usize { + if pending_count == 0 { + return 0; + } + + let mut remove_user = false; + let mut removed_active_entries = 0usize; + if let Some(user_ips) = active_ips.get_mut(user) { + if let Some(count) = user_ips.get_mut(&ip) { + if *count > pending_count { + *count -= pending_count; + } else if user_ips.remove(&ip).is_some() { + removed_active_entries = 1; + } + } + remove_user = user_ips.is_empty(); + } + if remove_user { + active_ips.remove(user); + } + removed_active_entries + } + + /// Queues a deferred active IP cleanup for a later async drain. pub fn enqueue_cleanup(&self, user: String, ip: IpAddr) { match self.cleanup_queue.lock() { Ok(mut queue) => { let count = queue.entry((user, ip)).or_insert(0); *count = count.saturating_add(1); + self.cleanup_deferred_releases + .fetch_add(1, Ordering::Relaxed); } Err(poisoned) => { let mut queue = poisoned.into_inner(); let count = queue.entry((user.clone(), ip)).or_insert(0); *count = count.saturating_add(1); + self.cleanup_deferred_releases + .fetch_add(1, Ordering::Relaxed); self.cleanup_queue.clear_poison(); tracing::warn!( "UserIpTracker cleanup_queue lock poisoned; recovered and enqueued IP cleanup for {} ({})", @@ -86,16 +156,27 @@ impl UserIpTracker { } pub(crate) async fn drain_cleanup_queue(&self) { - // Serialize queue draining and active-IP mutation so check-and-add cannot - // observe stale active entries that are already queued for removal. - let _drain_guard = self.cleanup_drain_lock.lock().await; + let Ok(_drain_guard) = self.cleanup_drain_lock.try_lock() else { + return; + }; + let to_remove = { match self.cleanup_queue.lock() { Ok(mut queue) => { if queue.is_empty() { return; } - std::mem::take(&mut *queue) + let mut drained = + HashMap::with_capacity(queue.len().min(CLEANUP_DRAIN_BATCH_LIMIT)); + for _ in 0..CLEANUP_DRAIN_BATCH_LIMIT { + let Some(key) = queue.keys().next().cloned() else { + break; + }; + if let Some(count) = queue.remove(&key) { + drained.insert(key, count); + } + } + drained } Err(poisoned) => { let mut queue = poisoned.into_inner(); @@ -103,31 +184,33 @@ impl UserIpTracker { self.cleanup_queue.clear_poison(); return; } - let drained = std::mem::take(&mut *queue); + let mut drained = + HashMap::with_capacity(queue.len().min(CLEANUP_DRAIN_BATCH_LIMIT)); + for _ in 0..CLEANUP_DRAIN_BATCH_LIMIT { + let Some(key) = queue.keys().next().cloned() else { + break; + }; + if let Some(count) = queue.remove(&key) { + drained.insert(key, count); + } + } self.cleanup_queue.clear_poison(); drained } } }; + if to_remove.is_empty() { + return; + } let mut active_ips = self.active_ips.write().await; + let mut removed_active_entries = 0usize; for ((user, ip), pending_count) in to_remove { - if pending_count == 0 { - continue; - } - if let Some(user_ips) = active_ips.get_mut(&user) { - if let Some(count) = user_ips.get_mut(&ip) { - if *count > pending_count { - *count -= pending_count; - } else { - user_ips.remove(&ip); - } - } - if user_ips.is_empty() { - active_ips.remove(&user); - } - } + removed_active_entries = removed_active_entries.saturating_add( + Self::apply_active_cleanup(&mut active_ips, &user, ip, pending_count), + ); } + Self::decrement_counter(&self.active_entry_count, removed_active_entries); } fn now_epoch_secs() -> u64 { @@ -137,6 +220,24 @@ impl UserIpTracker { .as_secs() } + async fn active_and_recent_write( + &self, + ) -> ( + RwLockWriteGuard<'_, HashMap>>, + RwLockWriteGuard<'_, HashMap>>, + ) { + loop { + let active_ips = self.active_ips.write().await; + match self.recent_ips.try_write() { + Ok(recent_ips) => return (active_ips, recent_ips), + Err(_) => { + drop(active_ips); + tokio::task::yield_now().await; + } + } + } + } + async fn maybe_compact_empty_users(&self) { const COMPACT_INTERVAL_SECS: u64 = 60; let now_epoch_secs = Self::now_epoch_secs(); @@ -157,14 +258,16 @@ impl UserIpTracker { return; } - let mut active_ips = self.active_ips.write().await; - let mut recent_ips = self.recent_ips.write().await; let window = *self.limit_window.read().await; let now = Instant::now(); + let (mut active_ips, mut recent_ips) = self.active_and_recent_write().await; + let mut pruned_recent_entries = 0usize; for user_recent in recent_ips.values_mut() { - Self::prune_recent(user_recent, now, window); + pruned_recent_entries = + pruned_recent_entries.saturating_add(Self::prune_recent(user_recent, now, window)); } + Self::decrement_counter(&self.recent_entry_count, pruned_recent_entries); let mut users = Vec::::with_capacity(active_ips.len().saturating_add(recent_ips.len())); @@ -208,6 +311,9 @@ impl UserIpTracker { active_entries, recent_entries, cleanup_queue_len, + active_cap_rejects: self.active_cap_rejects.load(Ordering::Relaxed), + recent_cap_rejects: self.recent_cap_rejects.load(Ordering::Relaxed), + cleanup_deferred_releases: self.cleanup_deferred_releases.load(Ordering::Relaxed), } } @@ -238,11 +344,17 @@ impl UserIpTracker { max_ips.clone_from(limits); } - fn prune_recent(user_recent: &mut HashMap, now: Instant, window: Duration) { + fn prune_recent( + user_recent: &mut HashMap, + now: Instant, + window: Duration, + ) -> usize { if user_recent.is_empty() { - return; + return 0; } + let before = user_recent.len(); user_recent.retain(|_, seen_at| now.duration_since(*seen_at) <= window); + before.saturating_sub(user_recent.len()) } pub async fn check_and_add(&self, username: &str, ip: IpAddr) -> Result<(), String> { @@ -261,24 +373,36 @@ impl UserIpTracker { let window = *self.limit_window.read().await; let now = Instant::now(); - let mut active_ips = self.active_ips.write().await; + let (mut active_ips, mut recent_ips) = self.active_and_recent_write().await; let user_active = active_ips .entry(username.to_string()) .or_insert_with(HashMap::new); - - let mut recent_ips = self.recent_ips.write().await; let user_recent = recent_ips .entry(username.to_string()) .or_insert_with(HashMap::new); - Self::prune_recent(user_recent, now, window); + let pruned_recent_entries = Self::prune_recent(user_recent, now, window); + Self::decrement_counter(&self.recent_entry_count, pruned_recent_entries); + let recent_contains_ip = user_recent.contains_key(&ip); if let Some(count) = user_active.get_mut(&ip) { + if !recent_contains_ip + && self.recent_entry_count.load(Ordering::Relaxed) >= MAX_RECENT_IP_ENTRIES + { + self.recent_cap_rejects.fetch_add(1, Ordering::Relaxed); + return Err(format!( + "IP tracker recent entry cap reached: entries={}/{}", + self.recent_entry_count.load(Ordering::Relaxed), + MAX_RECENT_IP_ENTRIES + )); + } *count = count.saturating_add(1); - user_recent.insert(ip, now); + if user_recent.insert(ip, now).is_none() { + self.recent_entry_count.fetch_add(1, Ordering::Relaxed); + } return Ok(()); } - let is_new_ip = !user_recent.contains_key(&ip); + let is_new_ip = !recent_contains_ip; if let Some(limit) = limit { let active_limit_reached = user_active.len() >= limit; @@ -302,30 +426,62 @@ impl UserIpTracker { } } - user_active.insert(ip, 1); - user_recent.insert(ip, now); + if self.active_entry_count.load(Ordering::Relaxed) >= MAX_ACTIVE_IP_ENTRIES { + self.active_cap_rejects.fetch_add(1, Ordering::Relaxed); + return Err(format!( + "IP tracker active entry cap reached: entries={}/{}", + self.active_entry_count.load(Ordering::Relaxed), + MAX_ACTIVE_IP_ENTRIES + )); + } + if is_new_ip && self.recent_entry_count.load(Ordering::Relaxed) >= MAX_RECENT_IP_ENTRIES { + self.recent_cap_rejects.fetch_add(1, Ordering::Relaxed); + return Err(format!( + "IP tracker recent entry cap reached: entries={}/{}", + self.recent_entry_count.load(Ordering::Relaxed), + MAX_RECENT_IP_ENTRIES + )); + } + + if user_active.insert(ip, 1).is_none() { + self.active_entry_count.fetch_add(1, Ordering::Relaxed); + } + if user_recent.insert(ip, now).is_none() { + self.recent_entry_count.fetch_add(1, Ordering::Relaxed); + } Ok(()) } pub async fn remove_ip(&self, username: &str, ip: IpAddr) { self.maybe_compact_empty_users().await; let mut active_ips = self.active_ips.write().await; + let mut removed_active_entries = 0usize; if let Some(user_ips) = active_ips.get_mut(username) { if let Some(count) = user_ips.get_mut(&ip) { if *count > 1 { *count -= 1; } else { - user_ips.remove(&ip); + if user_ips.remove(&ip).is_some() { + removed_active_entries = 1; + } } } if user_ips.is_empty() { active_ips.remove(username); } } + Self::decrement_counter(&self.active_entry_count, removed_active_entries); } pub async fn get_recent_counts_for_users(&self, users: &[String]) -> HashMap { self.drain_cleanup_queue().await; + self.get_recent_counts_for_users_snapshot(users).await + } + + pub(crate) async fn get_recent_counts_for_users_snapshot( + &self, + users: &[String], + ) -> HashMap { let window = *self.limit_window.read().await; let now = Instant::now(); let recent_ips = self.recent_ips.read().await; @@ -400,19 +556,29 @@ impl UserIpTracker { pub async fn get_stats(&self) -> Vec<(String, usize, usize)> { self.drain_cleanup_queue().await; + self.get_stats_snapshot().await + } + + pub(crate) async fn get_stats_snapshot(&self) -> Vec<(String, usize, usize)> { let active_ips = self.active_ips.read().await; + let active_counts = active_ips + .iter() + .map(|(username, user_ips)| (username.clone(), user_ips.len())) + .collect::>(); + drop(active_ips); + let max_ips = self.max_ips.read().await; let default_max_ips = *self.default_max_ips.read().await; - let mut stats = Vec::new(); - for (username, user_ips) in active_ips.iter() { + let mut stats = Vec::with_capacity(active_counts.len()); + for (username, active_count) in active_counts { let limit = max_ips - .get(username) + .get(&username) .copied() .filter(|limit| *limit > 0) .or((default_max_ips > 0).then_some(default_max_ips)) .unwrap_or(0); - stats.push((username.clone(), user_ips.len(), limit)); + stats.push((username, active_count, limit)); } stats.sort_by(|a, b| a.0.cmp(&b.0)); @@ -421,20 +587,30 @@ impl UserIpTracker { pub async fn clear_user_ips(&self, username: &str) { let mut active_ips = self.active_ips.write().await; - active_ips.remove(username); + let removed_active_entries = active_ips + .remove(username) + .map(|ips| ips.len()) + .unwrap_or(0); drop(active_ips); + Self::decrement_counter(&self.active_entry_count, removed_active_entries); let mut recent_ips = self.recent_ips.write().await; - recent_ips.remove(username); + let removed_recent_entries = recent_ips + .remove(username) + .map(|ips| ips.len()) + .unwrap_or(0); + Self::decrement_counter(&self.recent_entry_count, removed_recent_entries); } pub async fn clear_all(&self) { let mut active_ips = self.active_ips.write().await; active_ips.clear(); drop(active_ips); + self.active_entry_count.store(0, Ordering::Relaxed); let mut recent_ips = self.recent_ips.write().await; recent_ips.clear(); + self.recent_entry_count.store(0, Ordering::Relaxed); } pub async fn is_ip_active(&self, username: &str, ip: IpAddr) -> bool { diff --git a/src/metrics.rs b/src/metrics.rs index ba44a5f..addb18f 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -18,8 +18,13 @@ use crate::ip_tracker::UserIpTracker; use crate::proxy::shared_state::ProxySharedState; use crate::stats::Stats; use crate::stats::beobachten::BeobachtenStore; +use crate::tls_front::cache; +use crate::tls_front::fetcher; use crate::transport::{ListenOptions, create_listener}; +// Keeps `/metrics` response size bounded when per-user telemetry is enabled. +const USER_LABELED_METRICS_MAX_USERS: usize = 4096; + pub async fn serve( port: u16, listen: Option, @@ -311,6 +316,12 @@ async fn render_metrics( "telemt_telemetry_user_enabled {}", if user_enabled { 1 } else { 0 } ); + let _ = writeln!( + out, + "# HELP telemt_stats_user_entries Retained per-user stats entries" + ); + let _ = writeln!(out, "# TYPE telemt_stats_user_entries gauge"); + let _ = writeln!(out, "telemt_stats_user_entries {}", stats.user_stats_len()); let _ = writeln!( out, @@ -366,6 +377,53 @@ async fn render_metrics( stats.get_buffer_pool_in_use_gauge() ); + let _ = writeln!( + out, + "# HELP telemt_tls_fetch_profile_cache_entries Current adaptive TLS fetch profile-cache entries" + ); + let _ = writeln!(out, "# TYPE telemt_tls_fetch_profile_cache_entries gauge"); + let _ = writeln!( + out, + "telemt_tls_fetch_profile_cache_entries {}", + fetcher::profile_cache_entries_for_metrics() + ); + let _ = writeln!( + out, + "# HELP telemt_tls_fetch_profile_cache_cap_drops_total Profile-cache winner inserts skipped because the cache cap was reached" + ); + let _ = writeln!( + out, + "# TYPE telemt_tls_fetch_profile_cache_cap_drops_total counter" + ); + let _ = writeln!( + out, + "telemt_tls_fetch_profile_cache_cap_drops_total {}", + fetcher::profile_cache_cap_drops_for_metrics() + ); + let _ = writeln!( + out, + "# HELP telemt_tls_front_full_cert_budget_ips Current IP entries tracked by TLS full-cert budget" + ); + let _ = writeln!(out, "# TYPE telemt_tls_front_full_cert_budget_ips gauge"); + let _ = writeln!( + out, + "telemt_tls_front_full_cert_budget_ips {}", + cache::full_cert_sent_ips_for_metrics() + ); + let _ = writeln!( + out, + "# HELP telemt_tls_front_full_cert_budget_cap_drops_total New IPs denied full-cert budget tracking because the cap was reached" + ); + let _ = writeln!( + out, + "# TYPE telemt_tls_front_full_cert_budget_cap_drops_total counter" + ); + let _ = writeln!( + out, + "telemt_tls_front_full_cert_budget_cap_drops_total {}", + cache::full_cert_sent_cap_drops_for_metrics() + ); + let _ = writeln!( out, "# HELP telemt_connections_total Total accepted connections" @@ -3019,17 +3077,6 @@ async fn render_metrics( 0 } ); - let _ = writeln!( - out, - "# HELP telemt_telemetry_user_series_suppressed User-labeled metric series suppression flag" - ); - let _ = writeln!(out, "# TYPE telemt_telemetry_user_series_suppressed gauge"); - let _ = writeln!( - out, - "telemt_telemetry_user_series_suppressed {}", - if user_enabled { 0 } else { 1 } - ); - let ip_memory = ip_tracker.memory_stats().await; let _ = writeln!( out, @@ -3071,11 +3118,46 @@ async fn render_metrics( "telemt_ip_tracker_cleanup_queue_len {}", ip_memory.cleanup_queue_len ); + let _ = writeln!( + out, + "# HELP telemt_ip_tracker_cleanup_total Release cleanups deferred through the cleanup queue" + ); + let _ = writeln!(out, "# TYPE telemt_ip_tracker_cleanup_total counter"); + let _ = writeln!( + out, + "telemt_ip_tracker_cleanup_total{{path=\"deferred\"}} {}", + ip_memory.cleanup_deferred_releases + ); + let _ = writeln!( + out, + "# HELP telemt_ip_tracker_cap_rejects_total New connection rejects caused by global IP tracker caps" + ); + let _ = writeln!(out, "# TYPE telemt_ip_tracker_cap_rejects_total counter"); + let _ = writeln!( + out, + "telemt_ip_tracker_cap_rejects_total{{scope=\"active\"}} {}", + ip_memory.active_cap_rejects + ); + let _ = writeln!( + out, + "telemt_ip_tracker_cap_rejects_total{{scope=\"recent\"}} {}", + ip_memory.recent_cap_rejects + ); + + let mut user_stats_emitted = 0usize; + let mut user_stats_suppressed = 0usize; + let mut unique_ip_emitted = 0usize; + let mut unique_ip_suppressed = 0usize; if user_enabled { for entry in stats.iter_user_stats() { + if user_stats_emitted >= USER_LABELED_METRICS_MAX_USERS { + user_stats_suppressed = user_stats_suppressed.saturating_add(1); + continue; + } let user = entry.key(); let s = entry.value(); + user_stats_emitted = user_stats_emitted.saturating_add(1); let _ = writeln!( out, "telemt_user_connections_total{{user=\"{}\"}} {}", @@ -3117,7 +3199,7 @@ async fn render_metrics( ); } - let ip_stats = ip_tracker.get_stats().await; + let ip_stats = ip_tracker.get_stats_snapshot().await; let ip_counts: HashMap = ip_stats .into_iter() .map(|(user, count, _)| (user, count)) @@ -3129,7 +3211,7 @@ async fn render_metrics( unique_users.extend(ip_counts.keys().cloned()); let unique_users_vec: Vec = unique_users.iter().cloned().collect(); let recent_counts = ip_tracker - .get_recent_counts_for_users(&unique_users_vec) + .get_recent_counts_for_users_snapshot(&unique_users_vec) .await; let _ = writeln!( @@ -3154,6 +3236,11 @@ async fn render_metrics( let _ = writeln!(out, "# TYPE telemt_user_unique_ips_utilization gauge"); for user in unique_users { + if unique_ip_emitted >= USER_LABELED_METRICS_MAX_USERS { + unique_ip_suppressed = unique_ip_suppressed.saturating_add(1); + continue; + } + unique_ip_emitted = unique_ip_emitted.saturating_add(1); let current = ip_counts.get(&user).copied().unwrap_or(0); let limit = config .access @@ -3193,6 +3280,46 @@ async fn render_metrics( } } + let _ = writeln!( + out, + "# HELP telemt_telemetry_user_series_suppressed User-labeled metric series suppression flag" + ); + let _ = writeln!(out, "# TYPE telemt_telemetry_user_series_suppressed gauge"); + let _ = writeln!( + out, + "telemt_telemetry_user_series_suppressed {}", + if user_enabled && user_stats_suppressed == 0 && unique_ip_suppressed == 0 { + 0 + } else { + 1 + } + ); + let _ = writeln!( + out, + "# HELP telemt_telemetry_user_series_users User-labeled metric users by export status" + ); + let _ = writeln!(out, "# TYPE telemt_telemetry_user_series_users gauge"); + let _ = writeln!( + out, + "telemt_telemetry_user_series_users{{family=\"stats\",status=\"emitted\"}} {}", + user_stats_emitted + ); + let _ = writeln!( + out, + "telemt_telemetry_user_series_users{{family=\"stats\",status=\"suppressed\"}} {}", + user_stats_suppressed + ); + let _ = writeln!( + out, + "telemt_telemetry_user_series_users{{family=\"unique_ip\",status=\"emitted\"}} {}", + unique_ip_emitted + ); + let _ = writeln!( + out, + "telemt_telemetry_user_series_users{{family=\"unique_ip\",status=\"suppressed\"}} {}", + unique_ip_suppressed + ); + out } @@ -3406,9 +3533,19 @@ mod tests { assert!(output.contains("# TYPE telemt_user_unique_ips_recent_window gauge")); assert!(output.contains("# TYPE telemt_user_unique_ips_limit gauge")); assert!(output.contains("# TYPE telemt_user_unique_ips_utilization gauge")); + assert!(output.contains("# TYPE telemt_stats_user_entries gauge")); + assert!(output.contains("# TYPE telemt_telemetry_user_series_users gauge")); assert!(output.contains("# TYPE telemt_ip_tracker_users gauge")); assert!(output.contains("# TYPE telemt_ip_tracker_entries gauge")); assert!(output.contains("# TYPE telemt_ip_tracker_cleanup_queue_len gauge")); + assert!(output.contains("# TYPE telemt_ip_tracker_cleanup_total counter")); + assert!(output.contains("# TYPE telemt_ip_tracker_cap_rejects_total counter")); + assert!(output.contains("# TYPE telemt_tls_fetch_profile_cache_entries gauge")); + assert!(output.contains("# TYPE telemt_tls_fetch_profile_cache_cap_drops_total counter")); + assert!(output.contains("# TYPE telemt_tls_front_full_cert_budget_ips gauge")); + assert!( + output.contains("# TYPE telemt_tls_front_full_cert_budget_cap_drops_total counter") + ); } #[tokio::test] diff --git a/src/proxy/client.rs b/src/proxy/client.rs index e28d0b6..90dfee0 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -1431,8 +1431,8 @@ impl RunningClientHandler { /// Main dispatch after successful handshake. /// Two modes: - /// - Direct: TCP relay to TG DC (existing behavior) - /// - Middle Proxy: RPC multiplex through ME pool (new — supports CDN DCs) + /// - Direct: TCP relay to TG DC (existing behavior) + /// - Middle Proxy: RPC multiplex through ME pool (supports CDN DCs) #[cfg(test)] async fn handle_authenticated_static( client_reader: CryptoReader, diff --git a/src/proxy/tests/direct_relay_security_tests.rs b/src/proxy/tests/direct_relay_security_tests.rs index 193ff7b..9554752 100644 --- a/src/proxy/tests/direct_relay_security_tests.rs +++ b/src/proxy/tests/direct_relay_security_tests.rs @@ -669,6 +669,13 @@ fn adversarial_check_then_symlink_flip_is_blocked_by_nofollow_open() { "telemt-unknown-dc-check-open-race-{}", std::process::id() )); + if let Ok(meta) = fs::symlink_metadata(&parent) { + if meta.file_type().is_symlink() || meta.is_file() { + fs::remove_file(&parent).expect("stale check-open-race path must be removable"); + } else { + fs::remove_dir_all(&parent).expect("stale check-open-race parent must be removable"); + } + } fs::create_dir_all(&parent).expect("check-open-race parent must be creatable"); let target = parent.join("unknown-dc.log"); diff --git a/src/stats/beobachten.rs b/src/stats/beobachten.rs index 79b2bcd..5684455 100644 --- a/src/stats/beobachten.rs +++ b/src/stats/beobachten.rs @@ -74,16 +74,21 @@ impl BeobachtenStore { } let now = Instant::now(); - let mut guard = self.inner.lock(); - Self::cleanup(&mut guard, now, ttl); - guard.last_cleanup = Some(now); + let entries = { + let mut guard = self.inner.lock(); + Self::cleanup(&mut guard, now, ttl); + guard.last_cleanup = Some(now); + + guard + .entries + .iter() + .map(|((class, ip), entry)| (class.clone(), *ip, entry.tries)) + .collect::>() + }; let mut grouped = BTreeMap::>::new(); - for ((class, ip), entry) in &guard.entries { - grouped - .entry(class.clone()) - .or_default() - .push((*ip, entry.tries)); + for (class, ip, tries) in entries { + grouped.entry(class).or_default().push((ip, tries)); } if grouped.is_empty() { diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 9678f2a..cff9571 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -2477,6 +2477,11 @@ impl Stats { self.user_stats.iter() } + /// Current number of retained per-user stats entries. + pub fn user_stats_len(&self) -> usize { + self.user_stats.len() + } + pub fn uptime_secs(&self) -> f64 { self.start_time .read() diff --git a/src/stream/tls_stream.rs b/src/stream/tls_stream.rs index 3f100d1..66a8f82 100644 --- a/src/stream/tls_stream.rs +++ b/src/stream/tls_stream.rs @@ -277,6 +277,7 @@ impl StreamState for TlsReaderState { pub struct FakeTlsReader { upstream: R, state: TlsReaderState, + body_scratch: Vec, } impl FakeTlsReader { @@ -284,6 +285,7 @@ impl FakeTlsReader { Self { upstream, state: TlsReaderState::Idle, + body_scratch: Vec::new(), } } @@ -439,7 +441,13 @@ impl AsyncRead for FakeTlsReader { length, mut buffer, } => { - let result = poll_read_body(&mut this.upstream, cx, &mut buffer, length); + let result = poll_read_body( + &mut this.upstream, + cx, + &mut buffer, + length, + &mut this.body_scratch, + ); match result { BodyPollResult::Pending => { @@ -558,34 +566,36 @@ fn poll_read_body( cx: &mut Context<'_>, buffer: &mut BytesMut, target_len: usize, + scratch: &mut Vec, ) -> BodyPollResult { - // NOTE: This implementation uses a temporary Vec to avoid tricky borrow/lifetime - // issues with BytesMut spare capacity and ReadBuf across polls. - // It's safe and correct; optimization is possible if needed. while buffer.len() < target_len { let remaining = target_len - buffer.len(); + let chunk_len = remaining.min(8192); - let mut temp = vec![0u8; remaining.min(8192)]; - let mut read_buf = ReadBuf::new(&mut temp); - - match Pin::new(&mut *upstream).poll_read(cx, &mut read_buf) { - Poll::Pending => return BodyPollResult::Pending, - Poll::Ready(Err(e)) => return BodyPollResult::Error(e), - Poll::Ready(Ok(())) => { - let n = read_buf.filled().len(); - if n == 0 { - return BodyPollResult::Error(Error::new( - ErrorKind::UnexpectedEof, - format!( - "unexpected EOF in TLS body (got {} of {} bytes)", - buffer.len(), - target_len - ), - )); - } - buffer.extend_from_slice(&temp[..n]); - } + if scratch.len() < chunk_len { + scratch.resize(chunk_len, 0); } + + let n = { + let mut read_buf = ReadBuf::new(&mut scratch[..chunk_len]); + match Pin::new(&mut *upstream).poll_read(cx, &mut read_buf) { + Poll::Pending => return BodyPollResult::Pending, + Poll::Ready(Err(e)) => return BodyPollResult::Error(e), + Poll::Ready(Ok(())) => read_buf.filled().len(), + } + }; + + if n == 0 { + return BodyPollResult::Error(Error::new( + ErrorKind::UnexpectedEof, + format!( + "unexpected EOF in TLS body (got {} of {} bytes)", + buffer.len(), + target_len + ), + )); + } + buffer.extend_from_slice(&scratch[..n]); } BodyPollResult::Complete(buffer.split().freeze()) diff --git a/src/tests/ip_tracker_regression_tests.rs b/src/tests/ip_tracker_regression_tests.rs index 193c9c3..2bca5b6 100644 --- a/src/tests/ip_tracker_regression_tests.rs +++ b/src/tests/ip_tracker_regression_tests.rs @@ -559,9 +559,7 @@ async fn mass_reconnect_sync_cleanup_prevents_temporary_reservation_bloat() { } #[tokio::test] -async fn adversarial_drain_cleanup_queue_race_does_not_cause_false_rejections() { - // Regression guard: concurrent cleanup draining must not produce false - // limit denials for a new IP when the previous IP is already queued. +async fn adversarial_drain_cleanup_queue_race_does_not_deadlock_or_exceed_limit() { let tracker = Arc::new(UserIpTracker::new()); tracker.set_user_limit("racer", 1).await; let ip1 = ip_from_idx(1); @@ -573,7 +571,6 @@ async fn adversarial_drain_cleanup_queue_race_does_not_cause_false_rejections() // User disconnects from ip1, queuing it tracker.enqueue_cleanup("racer".to_string(), ip1); - let mut saw_false_rejection = false; for _ in 0..100 { // Queue cleanup then race explicit drain and check-and-add on the alternative IP. tracker.enqueue_cleanup("racer".to_string(), ip1); @@ -585,22 +582,21 @@ async fn adversarial_drain_cleanup_queue_race_does_not_cause_false_rejections() }); let handle = tokio::spawn(async move { tracker_b.check_and_add("racer", ip2).await }); - drain_handle.await.unwrap(); - let res = handle.await.unwrap(); - if res.is_err() { - saw_false_rejection = true; - break; - } + tokio::time::timeout(Duration::from_secs(1), drain_handle) + .await + .expect("cleanup drain must not deadlock") + .unwrap(); + let _ = tokio::time::timeout(Duration::from_secs(1), handle) + .await + .expect("admission must not deadlock") + .unwrap(); - // Restore baseline for next iteration. + assert!(tracker.get_active_ip_count("racer").await <= 1); + tracker.drain_cleanup_queue().await; tracker.remove_ip("racer", ip2).await; + tracker.remove_ip("racer", ip1).await; tracker.check_and_add("racer", ip1).await.unwrap(); } - - assert!( - !saw_false_rejection, - "Concurrent cleanup draining must not cause false-positive IP denials" - ); } #[tokio::test] diff --git a/src/tls_front/cache.rs b/src/tls_front/cache.rs index 4f71f5a..f18084b 100644 --- a/src/tls_front/cache.rs +++ b/src/tls_front/cache.rs @@ -1,8 +1,11 @@ use std::collections::HashMap; +use std::collections::hash_map::DefaultHasher; +use std::hash::{Hash, Hasher}; use std::net::IpAddr; use std::path::{Path, PathBuf}; use std::sync::Arc; -use std::time::{Duration, Instant, SystemTime}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use tokio::sync::RwLock; use tokio::time::sleep; @@ -12,12 +15,30 @@ use crate::tls_front::types::{ CachedTlsData, ParsedServerHello, TlsBehaviorProfile, TlsFetchResult, }; +const FULL_CERT_SENT_SWEEP_INTERVAL_SECS: u64 = 30; +const FULL_CERT_SENT_MAX_IPS: usize = 65_536; +const FULL_CERT_SENT_SHARDS: usize = 64; + +static FULL_CERT_SENT_IPS_GAUGE: AtomicU64 = AtomicU64::new(0); +static FULL_CERT_SENT_CAP_DROPS: AtomicU64 = AtomicU64::new(0); + +/// Current number of IPs tracked by the TLS full-cert budget gate. +pub(crate) fn full_cert_sent_ips_for_metrics() -> u64 { + FULL_CERT_SENT_IPS_GAUGE.load(Ordering::Relaxed) +} + +/// Number of new IPs denied a full-cert budget slot because the cap was reached. +pub(crate) fn full_cert_sent_cap_drops_for_metrics() -> u64 { + FULL_CERT_SENT_CAP_DROPS.load(Ordering::Relaxed) +} + /// Lightweight in-memory + optional on-disk cache for TLS fronting data. #[derive(Debug)] pub struct TlsFrontCache { memory: RwLock>>, default: Arc, - full_cert_sent: RwLock>, + full_cert_sent_shards: Vec>>, + full_cert_sent_last_sweep_epoch_secs: AtomicU64, disk_path: PathBuf, } @@ -52,7 +73,10 @@ impl TlsFrontCache { Self { memory: RwLock::new(map), default, - full_cert_sent: RwLock::new(HashMap::new()), + full_cert_sent_shards: (0..FULL_CERT_SENT_SHARDS) + .map(|_| RwLock::new(HashMap::new())) + .collect(), + full_cert_sent_last_sweep_epoch_secs: AtomicU64::new(0), disk_path: disk_path.as_ref().to_path_buf(), } } @@ -69,22 +93,83 @@ impl TlsFrontCache { self.memory.read().await.contains_key(domain) } + fn full_cert_sent_shard_index(client_ip: IpAddr) -> usize { + let mut hasher = DefaultHasher::new(); + client_ip.hash(&mut hasher); + (hasher.finish() as usize) % FULL_CERT_SENT_SHARDS + } + + fn full_cert_sent_shard(&self, client_ip: IpAddr) -> &RwLock> { + &self.full_cert_sent_shards[Self::full_cert_sent_shard_index(client_ip)] + } + + fn decrement_full_cert_sent_entries(amount: usize) { + if amount == 0 { + return; + } + let amount = amount as u64; + let _ = + FULL_CERT_SENT_IPS_GAUGE.fetch_update(Ordering::AcqRel, Ordering::Relaxed, |current| { + Some(current.saturating_sub(amount)) + }); + } + + fn try_reserve_full_cert_sent_entry() -> bool { + let mut current = FULL_CERT_SENT_IPS_GAUGE.load(Ordering::Relaxed); + loop { + if current >= FULL_CERT_SENT_MAX_IPS as u64 { + return false; + } + match FULL_CERT_SENT_IPS_GAUGE.compare_exchange_weak( + current, + current.saturating_add(1), + Ordering::AcqRel, + Ordering::Relaxed, + ) { + Ok(_) => return true, + Err(actual) => current = actual, + } + } + } + + async fn sweep_full_cert_sent_shards(&self, now: Instant, ttl: Duration) { + for shard in &self.full_cert_sent_shards { + let mut guard = shard.write().await; + let before = guard.len(); + guard.retain(|_, seen_at| now.duration_since(*seen_at) < ttl); + Self::decrement_full_cert_sent_entries(before.saturating_sub(guard.len())); + } + } + /// Returns true when full cert payload should be sent for client_ip /// according to TTL policy. pub async fn take_full_cert_budget_for_ip(&self, client_ip: IpAddr, ttl: Duration) -> bool { if ttl.is_zero() { - self.full_cert_sent - .write() - .await - .insert(client_ip, Instant::now()); return true; } let now = Instant::now(); - let mut guard = self.full_cert_sent.write().await; - guard.retain(|_, seen_at| now.duration_since(*seen_at) < ttl); + let now_epoch_secs = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + let should_sweep = self + .full_cert_sent_last_sweep_epoch_secs + .fetch_update(Ordering::AcqRel, Ordering::Relaxed, |last_sweep| { + if now_epoch_secs.saturating_sub(last_sweep) >= FULL_CERT_SENT_SWEEP_INTERVAL_SECS { + Some(now_epoch_secs) + } else { + None + } + }) + .is_ok(); - match guard.get_mut(&client_ip) { + if should_sweep { + self.sweep_full_cert_sent_shards(now, ttl).await; + } + + let mut guard = self.full_cert_sent_shard(client_ip).write().await; + let allowed = match guard.get_mut(&client_ip) { Some(seen_at) => { if now.duration_since(*seen_at) >= ttl { *seen_at = now; @@ -94,12 +179,43 @@ impl TlsFrontCache { } } None => { + if !Self::try_reserve_full_cert_sent_entry() { + FULL_CERT_SENT_CAP_DROPS.fetch_add(1, Ordering::Relaxed); + return false; + } guard.insert(client_ip, now); true } + }; + allowed + } + + #[cfg(test)] + async fn insert_full_cert_sent_for_tests(&self, client_ip: IpAddr, seen_at: Instant) { + let mut guard = self.full_cert_sent_shard(client_ip).write().await; + if guard.insert(client_ip, seen_at).is_none() { + FULL_CERT_SENT_IPS_GAUGE.fetch_add(1, Ordering::Relaxed); } } + #[cfg(test)] + async fn full_cert_sent_is_empty_for_tests(&self) -> bool { + for shard in &self.full_cert_sent_shards { + if !shard.read().await.is_empty() { + return false; + } + } + true + } + + #[cfg(test)] + async fn full_cert_sent_contains_for_tests(&self, client_ip: IpAddr) -> bool { + self.full_cert_sent_shard(client_ip) + .read() + .await + .contains_key(&client_ip) + } + pub async fn set(&self, domain: &str, data: CachedTlsData) { let mut guard = self.memory.write().await; guard.insert(domain.to_string(), Arc::new(data)); @@ -328,10 +444,68 @@ mod tests { #[tokio::test] async fn test_take_full_cert_budget_for_ip_zero_ttl_always_allows_full_payload() { let cache = TlsFrontCache::new(&["example.com".to_string()], 1024, "tlsfront-test-cache"); - let ip: IpAddr = "127.0.0.1".parse().expect("ip"); let ttl = Duration::ZERO; - assert!(cache.take_full_cert_budget_for_ip(ip, ttl).await); - assert!(cache.take_full_cert_budget_for_ip(ip, ttl).await); + for idx in 0..100_000u32 { + let ip = IpAddr::V4(std::net::Ipv4Addr::new( + 10, + ((idx >> 16) & 0xff) as u8, + ((idx >> 8) & 0xff) as u8, + (idx & 0xff) as u8, + )); + assert!(cache.take_full_cert_budget_for_ip(ip, ttl).await); + } + + assert!(cache.full_cert_sent_is_empty_for_tests().await); + } + + #[tokio::test] + async fn test_take_full_cert_budget_for_ip_sweeps_expired_entries_when_due() { + let cache = TlsFrontCache::new(&["example.com".to_string()], 1024, "tlsfront-test-cache"); + let stale_ip: IpAddr = "127.0.0.1".parse().expect("ip"); + let new_ip: IpAddr = "127.0.0.2".parse().expect("ip"); + let ttl = Duration::from_secs(1); + let stale_seen_at = Instant::now() + .checked_sub(Duration::from_secs(10)) + .unwrap_or_else(Instant::now); + + cache + .insert_full_cert_sent_for_tests(stale_ip, stale_seen_at) + .await; + cache + .full_cert_sent_last_sweep_epoch_secs + .store(0, Ordering::Relaxed); + + assert!(cache.take_full_cert_budget_for_ip(new_ip, ttl).await); + + assert!(!cache.full_cert_sent_contains_for_tests(stale_ip).await); + assert!(cache.full_cert_sent_contains_for_tests(new_ip).await); + } + + #[tokio::test] + async fn test_take_full_cert_budget_for_ip_does_not_sweep_every_call() { + let cache = TlsFrontCache::new(&["example.com".to_string()], 1024, "tlsfront-test-cache"); + let stale_ip: IpAddr = "127.0.0.1".parse().expect("ip"); + let new_ip: IpAddr = "127.0.0.2".parse().expect("ip"); + let ttl = Duration::from_secs(1); + let stale_seen_at = Instant::now() + .checked_sub(Duration::from_secs(10)) + .unwrap_or_else(Instant::now); + let now_epoch_secs = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + cache + .insert_full_cert_sent_for_tests(stale_ip, stale_seen_at) + .await; + cache + .full_cert_sent_last_sweep_epoch_secs + .store(now_epoch_secs, Ordering::Relaxed); + + assert!(cache.take_full_cert_budget_for_ip(new_ip, ttl).await); + + assert!(cache.full_cert_sent_contains_for_tests(stale_ip).await); + assert!(cache.full_cert_sent_contains_for_tests(new_ip).await); } } diff --git a/src/tls_front/fetcher.rs b/src/tls_front/fetcher.rs index 53ec803..3d85d89 100644 --- a/src/tls_front/fetcher.rs +++ b/src/tls_front/fetcher.rs @@ -3,7 +3,9 @@ use dashmap::DashMap; use std::net::SocketAddr; use std::sync::Arc; +use std::sync::Mutex; use std::sync::OnceLock; +use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{Duration, Instant}; use anyhow::{Result, anyhow}; @@ -144,12 +146,37 @@ enum FetchErrorKind { Other, } +const PROFILE_CACHE_MAX_ENTRIES: usize = 4096; + static PROFILE_CACHE: OnceLock> = OnceLock::new(); +static PROFILE_CACHE_INSERT_GUARD: OnceLock> = OnceLock::new(); +static PROFILE_CACHE_CAP_DROPS: AtomicU64 = AtomicU64::new(0); fn profile_cache() -> &'static DashMap { PROFILE_CACHE.get_or_init(DashMap::new) } +fn profile_cache_insert_guard() -> &'static Mutex<()> { + PROFILE_CACHE_INSERT_GUARD.get_or_init(|| Mutex::new(())) +} + +fn sweep_expired_profile_cache(ttl: Duration, now: Instant) { + if ttl.is_zero() { + return; + } + profile_cache().retain(|_, value| now.saturating_duration_since(value.updated_at) <= ttl); +} + +/// Current number of adaptive TLS fetch profile-cache entries. +pub(crate) fn profile_cache_entries_for_metrics() -> usize { + profile_cache().len() +} + +/// Number of fresh profile-cache winners skipped because the cache was full. +pub(crate) fn profile_cache_cap_drops_for_metrics() -> u64 { + PROFILE_CACHE_CAP_DROPS.load(Ordering::Relaxed) +} + fn route_hint( upstream: Option<&std::sync::Arc>, unix_sock: Option<&str>, @@ -267,6 +294,43 @@ fn remember_profile_success( let Some(key) = cache_key else { return; }; + remember_profile_success_with_cap(strategy, key, profile, now, PROFILE_CACHE_MAX_ENTRIES); +} + +fn remember_profile_success_with_cap( + strategy: &TlsFetchStrategy, + key: ProfileCacheKey, + profile: TlsFetchProfile, + now: Instant, + max_entries: usize, +) { + let Ok(_guard) = profile_cache_insert_guard().lock() else { + PROFILE_CACHE_CAP_DROPS.fetch_add(1, Ordering::Relaxed); + return; + }; + if max_entries == 0 { + PROFILE_CACHE_CAP_DROPS.fetch_add(1, Ordering::Relaxed); + return; + } + if profile_cache().contains_key(&key) { + profile_cache().insert( + key, + ProfileCacheValue { + profile, + updated_at: now, + }, + ); + return; + } + if profile_cache().len() >= max_entries { + // TLS fetch is control-plane work; sweeping under a tiny mutex keeps + // profile-cache cardinality hard-bounded without touching relay hot paths. + sweep_expired_profile_cache(strategy.profile_cache_ttl, now); + } + if profile_cache().len() >= max_entries { + PROFILE_CACHE_CAP_DROPS.fetch_add(1, Ordering::Relaxed); + return; + } profile_cache().insert( key, ProfileCacheValue { diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 404e864..50861eb 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -618,13 +618,9 @@ impl MePool { me_route_hybrid_max_wait: Duration::from_millis( me_route_hybrid_max_wait_ms.max(50), ), - me_route_blocking_send_timeout: if me_route_blocking_send_timeout_ms == 0 { - None - } else { - Some(Duration::from_millis( - me_route_blocking_send_timeout_ms.min(5_000), - )) - }, + me_route_blocking_send_timeout: Some(Duration::from_millis( + me_route_blocking_send_timeout_ms.clamp(1, 5_000), + )), me_route_last_success_epoch_ms: AtomicU64::new(0), me_route_hybrid_timeout_warn_epoch_ms: AtomicU64::new(0), me_async_recovery_last_trigger_epoch_ms: AtomicU64::new(0),