From 1f90e28871aebe54a80e11cf74b59b44942da859 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 30 Apr 2026 10:43:27 +0300 Subject: [PATCH] Cap scanner-sensitive Caches and IP-Tracker Cardinality --- src/ip_tracker.rs | 116 +++++++++++++++++++++++++++++++++++---- src/metrics.rs | 74 +++++++++++++++++++++++++ src/tls_front/cache.rs | 25 ++++++++- src/tls_front/fetcher.rs | 64 +++++++++++++++++++++ 4 files changed, 265 insertions(+), 14 deletions(-) diff --git a/src/ip_tracker.rs b/src/ip_tracker.rs index cc10d8e..b3eaa48 100644 --- a/src/ip_tracker.rs +++ b/src/ip_tracker.rs @@ -14,11 +14,18 @@ use tokio::sync::{Mutex as AsyncMutex, RwLock, RwLockWriteGuard}; use crate::config::UserMaxUniqueIpsMode; const CLEANUP_DRAIN_BATCH_LIMIT: usize = 1024; +const MAX_ACTIVE_IP_ENTRIES: u64 = 131_072; +const MAX_RECENT_IP_ENTRIES: u64 = 262_144; +/// Tracks active and recent client IPs for per-user admission control. #[derive(Debug, Clone)] pub struct UserIpTracker { active_ips: Arc>>>, recent_ips: Arc>>>, + active_entry_count: Arc, + recent_entry_count: Arc, + active_cap_rejects: Arc, + recent_cap_rejects: Arc, max_ips: Arc>>, default_max_ips: Arc>, limit_mode: Arc>, @@ -28,13 +35,23 @@ pub struct UserIpTracker { cleanup_drain_lock: Arc>, } +/// Point-in-time memory counters for user/IP limiter state. #[derive(Debug, Clone, Copy)] pub struct UserIpTrackerMemoryStats { + /// Number of users with active IP state. pub active_users: usize, + /// Number of users with recent IP state. pub recent_users: usize, + /// Number of active `(user, ip)` entries. pub active_entries: usize, + /// Number of recent-window `(user, ip)` entries. pub recent_entries: usize, + /// Number of deferred disconnect cleanups waiting to be drained. pub cleanup_queue_len: usize, + /// Number of new connections rejected by the global active-entry cap. + pub active_cap_rejects: u64, + /// Number of new connections rejected by the global recent-entry cap. + pub recent_cap_rejects: u64, } impl UserIpTracker { @@ -42,6 +59,10 @@ impl UserIpTracker { Self { active_ips: Arc::new(RwLock::new(HashMap::new())), recent_ips: Arc::new(RwLock::new(HashMap::new())), + active_entry_count: Arc::new(AtomicU64::new(0)), + recent_entry_count: Arc::new(AtomicU64::new(0)), + active_cap_rejects: Arc::new(AtomicU64::new(0)), + recent_cap_rejects: Arc::new(AtomicU64::new(0)), max_ips: Arc::new(RwLock::new(HashMap::new())), default_max_ips: Arc::new(RwLock::new(0)), limit_mode: Arc::new(RwLock::new(UserMaxUniqueIpsMode::ActiveWindow)), @@ -52,6 +73,16 @@ impl UserIpTracker { } } + fn decrement_counter(counter: &AtomicU64, amount: usize) { + if amount == 0 { + return; + } + let amount = amount as u64; + let _ = counter.fetch_update(Ordering::AcqRel, Ordering::Relaxed, |current| { + Some(current.saturating_sub(amount)) + }); + } + pub fn enqueue_cleanup(&self, user: String, ip: IpAddr) { match self.cleanup_queue.lock() { Ok(mut queue) => { @@ -136,6 +167,7 @@ impl UserIpTracker { } let mut active_ips = self.active_ips.write().await; + let mut removed_active_entries = 0usize; for ((user, ip), pending_count) in to_remove { if pending_count == 0 { continue; @@ -145,7 +177,9 @@ impl UserIpTracker { if *count > pending_count { *count -= pending_count; } else { - user_ips.remove(&ip); + if user_ips.remove(&ip).is_some() { + removed_active_entries = removed_active_entries.saturating_add(1); + } } } if user_ips.is_empty() { @@ -153,6 +187,7 @@ impl UserIpTracker { } } } + Self::decrement_counter(&self.active_entry_count, removed_active_entries); } fn now_epoch_secs() -> u64 { @@ -204,9 +239,12 @@ impl UserIpTracker { let now = Instant::now(); let (mut active_ips, mut recent_ips) = self.active_and_recent_write().await; + let mut pruned_recent_entries = 0usize; for user_recent in recent_ips.values_mut() { - Self::prune_recent(user_recent, now, window); + pruned_recent_entries = + pruned_recent_entries.saturating_add(Self::prune_recent(user_recent, now, window)); } + Self::decrement_counter(&self.recent_entry_count, pruned_recent_entries); let mut users = Vec::::with_capacity(active_ips.len().saturating_add(recent_ips.len())); @@ -250,6 +288,8 @@ impl UserIpTracker { active_entries, recent_entries, cleanup_queue_len, + active_cap_rejects: self.active_cap_rejects.load(Ordering::Relaxed), + recent_cap_rejects: self.recent_cap_rejects.load(Ordering::Relaxed), } } @@ -280,11 +320,17 @@ impl UserIpTracker { max_ips.clone_from(limits); } - fn prune_recent(user_recent: &mut HashMap, now: Instant, window: Duration) { + fn prune_recent( + user_recent: &mut HashMap, + now: Instant, + window: Duration, + ) -> usize { if user_recent.is_empty() { - return; + return 0; } + let before = user_recent.len(); user_recent.retain(|_, seen_at| now.duration_since(*seen_at) <= window); + before.saturating_sub(user_recent.len()) } pub async fn check_and_add(&self, username: &str, ip: IpAddr) -> Result<(), String> { @@ -310,15 +356,29 @@ impl UserIpTracker { let user_recent = recent_ips .entry(username.to_string()) .or_insert_with(HashMap::new); - Self::prune_recent(user_recent, now, window); + let pruned_recent_entries = Self::prune_recent(user_recent, now, window); + Self::decrement_counter(&self.recent_entry_count, pruned_recent_entries); + let recent_contains_ip = user_recent.contains_key(&ip); if let Some(count) = user_active.get_mut(&ip) { + if !recent_contains_ip + && self.recent_entry_count.load(Ordering::Relaxed) >= MAX_RECENT_IP_ENTRIES + { + self.recent_cap_rejects.fetch_add(1, Ordering::Relaxed); + return Err(format!( + "IP tracker recent entry cap reached: entries={}/{}", + self.recent_entry_count.load(Ordering::Relaxed), + MAX_RECENT_IP_ENTRIES + )); + } *count = count.saturating_add(1); - user_recent.insert(ip, now); + if user_recent.insert(ip, now).is_none() { + self.recent_entry_count.fetch_add(1, Ordering::Relaxed); + } return Ok(()); } - let is_new_ip = !user_recent.contains_key(&ip); + let is_new_ip = !recent_contains_ip; if let Some(limit) = limit { let active_limit_reached = user_active.len() >= limit; @@ -342,26 +402,51 @@ impl UserIpTracker { } } - user_active.insert(ip, 1); - user_recent.insert(ip, now); + if self.active_entry_count.load(Ordering::Relaxed) >= MAX_ACTIVE_IP_ENTRIES { + self.active_cap_rejects.fetch_add(1, Ordering::Relaxed); + return Err(format!( + "IP tracker active entry cap reached: entries={}/{}", + self.active_entry_count.load(Ordering::Relaxed), + MAX_ACTIVE_IP_ENTRIES + )); + } + if is_new_ip && self.recent_entry_count.load(Ordering::Relaxed) >= MAX_RECENT_IP_ENTRIES { + self.recent_cap_rejects.fetch_add(1, Ordering::Relaxed); + return Err(format!( + "IP tracker recent entry cap reached: entries={}/{}", + self.recent_entry_count.load(Ordering::Relaxed), + MAX_RECENT_IP_ENTRIES + )); + } + + if user_active.insert(ip, 1).is_none() { + self.active_entry_count.fetch_add(1, Ordering::Relaxed); + } + if user_recent.insert(ip, now).is_none() { + self.recent_entry_count.fetch_add(1, Ordering::Relaxed); + } Ok(()) } pub async fn remove_ip(&self, username: &str, ip: IpAddr) { self.maybe_compact_empty_users().await; let mut active_ips = self.active_ips.write().await; + let mut removed_active_entries = 0usize; if let Some(user_ips) = active_ips.get_mut(username) { if let Some(count) = user_ips.get_mut(&ip) { if *count > 1 { *count -= 1; } else { - user_ips.remove(&ip); + if user_ips.remove(&ip).is_some() { + removed_active_entries = 1; + } } } if user_ips.is_empty() { active_ips.remove(username); } } + Self::decrement_counter(&self.active_entry_count, removed_active_entries); } pub async fn get_recent_counts_for_users(&self, users: &[String]) -> HashMap { @@ -478,20 +563,27 @@ impl UserIpTracker { pub async fn clear_user_ips(&self, username: &str) { let mut active_ips = self.active_ips.write().await; - active_ips.remove(username); + let removed_active_entries = active_ips + .remove(username) + .map(|ips| ips.len()) + .unwrap_or(0); drop(active_ips); + Self::decrement_counter(&self.active_entry_count, removed_active_entries); let mut recent_ips = self.recent_ips.write().await; - recent_ips.remove(username); + let removed_recent_entries = recent_ips.remove(username).map(|ips| ips.len()).unwrap_or(0); + Self::decrement_counter(&self.recent_entry_count, removed_recent_entries); } pub async fn clear_all(&self) { let mut active_ips = self.active_ips.write().await; active_ips.clear(); drop(active_ips); + self.active_entry_count.store(0, Ordering::Relaxed); let mut recent_ips = self.recent_ips.write().await; recent_ips.clear(); + self.recent_entry_count.store(0, Ordering::Relaxed); } pub async fn is_ip_active(&self, username: &str, ip: IpAddr) -> bool { diff --git a/src/metrics.rs b/src/metrics.rs index 48bc1f0..75f22cf 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -18,6 +18,8 @@ use crate::ip_tracker::UserIpTracker; use crate::proxy::shared_state::ProxySharedState; use crate::stats::Stats; use crate::stats::beobachten::BeobachtenStore; +use crate::tls_front::cache; +use crate::tls_front::fetcher; use crate::transport::{ListenOptions, create_listener}; pub async fn serve( @@ -366,6 +368,56 @@ async fn render_metrics( stats.get_buffer_pool_in_use_gauge() ); + let _ = writeln!( + out, + "# HELP telemt_tls_fetch_profile_cache_entries Current adaptive TLS fetch profile-cache entries" + ); + let _ = writeln!( + out, + "# TYPE telemt_tls_fetch_profile_cache_entries gauge" + ); + let _ = writeln!( + out, + "telemt_tls_fetch_profile_cache_entries {}", + fetcher::profile_cache_entries_for_metrics() + ); + let _ = writeln!( + out, + "# HELP telemt_tls_fetch_profile_cache_cap_drops_total Profile-cache winner inserts skipped because the cache cap was reached" + ); + let _ = writeln!( + out, + "# TYPE telemt_tls_fetch_profile_cache_cap_drops_total counter" + ); + let _ = writeln!( + out, + "telemt_tls_fetch_profile_cache_cap_drops_total {}", + fetcher::profile_cache_cap_drops_for_metrics() + ); + let _ = writeln!( + out, + "# HELP telemt_tls_front_full_cert_budget_ips Current IP entries tracked by TLS full-cert budget" + ); + let _ = writeln!(out, "# TYPE telemt_tls_front_full_cert_budget_ips gauge"); + let _ = writeln!( + out, + "telemt_tls_front_full_cert_budget_ips {}", + cache::full_cert_sent_ips_for_metrics() + ); + let _ = writeln!( + out, + "# HELP telemt_tls_front_full_cert_budget_cap_drops_total New IPs denied full-cert budget tracking because the cap was reached" + ); + let _ = writeln!( + out, + "# TYPE telemt_tls_front_full_cert_budget_cap_drops_total counter" + ); + let _ = writeln!( + out, + "telemt_tls_front_full_cert_budget_cap_drops_total {}", + cache::full_cert_sent_cap_drops_for_metrics() + ); + let _ = writeln!( out, "# HELP telemt_connections_total Total accepted connections" @@ -3071,6 +3123,21 @@ async fn render_metrics( "telemt_ip_tracker_cleanup_queue_len {}", ip_memory.cleanup_queue_len ); + let _ = writeln!( + out, + "# HELP telemt_ip_tracker_cap_rejects_total New connection rejects caused by global IP tracker caps" + ); + let _ = writeln!(out, "# TYPE telemt_ip_tracker_cap_rejects_total counter"); + let _ = writeln!( + out, + "telemt_ip_tracker_cap_rejects_total{{scope=\"active\"}} {}", + ip_memory.active_cap_rejects + ); + let _ = writeln!( + out, + "telemt_ip_tracker_cap_rejects_total{{scope=\"recent\"}} {}", + ip_memory.recent_cap_rejects + ); if user_enabled { for entry in stats.iter_user_stats() { @@ -3409,6 +3476,13 @@ mod tests { assert!(output.contains("# TYPE telemt_ip_tracker_users gauge")); assert!(output.contains("# TYPE telemt_ip_tracker_entries gauge")); assert!(output.contains("# TYPE telemt_ip_tracker_cleanup_queue_len gauge")); + assert!(output.contains("# TYPE telemt_ip_tracker_cap_rejects_total counter")); + assert!(output.contains("# TYPE telemt_tls_fetch_profile_cache_entries gauge")); + assert!(output.contains("# TYPE telemt_tls_fetch_profile_cache_cap_drops_total counter")); + assert!(output.contains("# TYPE telemt_tls_front_full_cert_budget_ips gauge")); + assert!( + output.contains("# TYPE telemt_tls_front_full_cert_budget_cap_drops_total counter") + ); } #[tokio::test] diff --git a/src/tls_front/cache.rs b/src/tls_front/cache.rs index 4a571d0..8c2b6e5 100644 --- a/src/tls_front/cache.rs +++ b/src/tls_front/cache.rs @@ -14,6 +14,20 @@ use crate::tls_front::types::{ }; const FULL_CERT_SENT_SWEEP_INTERVAL_SECS: u64 = 30; +const FULL_CERT_SENT_MAX_IPS: usize = 65_536; + +static FULL_CERT_SENT_IPS_GAUGE: AtomicU64 = AtomicU64::new(0); +static FULL_CERT_SENT_CAP_DROPS: AtomicU64 = AtomicU64::new(0); + +/// Current number of IPs tracked by the TLS full-cert budget gate. +pub(crate) fn full_cert_sent_ips_for_metrics() -> u64 { + FULL_CERT_SENT_IPS_GAUGE.load(Ordering::Relaxed) +} + +/// Number of new IPs denied a full-cert budget slot because the cap was reached. +pub(crate) fn full_cert_sent_cap_drops_for_metrics() -> u64 { + FULL_CERT_SENT_CAP_DROPS.load(Ordering::Relaxed) +} /// Lightweight in-memory + optional on-disk cache for TLS fronting data. #[derive(Debug)] @@ -104,7 +118,7 @@ impl TlsFrontCache { guard.retain(|_, seen_at| now.duration_since(*seen_at) < ttl); } - match guard.get_mut(&client_ip) { + let allowed = match guard.get_mut(&client_ip) { Some(seen_at) => { if now.duration_since(*seen_at) >= ttl { *seen_at = now; @@ -114,10 +128,17 @@ impl TlsFrontCache { } } None => { + if guard.len() >= FULL_CERT_SENT_MAX_IPS { + FULL_CERT_SENT_CAP_DROPS.fetch_add(1, Ordering::Relaxed); + FULL_CERT_SENT_IPS_GAUGE.store(guard.len() as u64, Ordering::Relaxed); + return false; + } guard.insert(client_ip, now); true } - } + }; + FULL_CERT_SENT_IPS_GAUGE.store(guard.len() as u64, Ordering::Relaxed); + allowed } pub async fn set(&self, domain: &str, data: CachedTlsData) { diff --git a/src/tls_front/fetcher.rs b/src/tls_front/fetcher.rs index 53ec803..3d85d89 100644 --- a/src/tls_front/fetcher.rs +++ b/src/tls_front/fetcher.rs @@ -3,7 +3,9 @@ use dashmap::DashMap; use std::net::SocketAddr; use std::sync::Arc; +use std::sync::Mutex; use std::sync::OnceLock; +use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{Duration, Instant}; use anyhow::{Result, anyhow}; @@ -144,12 +146,37 @@ enum FetchErrorKind { Other, } +const PROFILE_CACHE_MAX_ENTRIES: usize = 4096; + static PROFILE_CACHE: OnceLock> = OnceLock::new(); +static PROFILE_CACHE_INSERT_GUARD: OnceLock> = OnceLock::new(); +static PROFILE_CACHE_CAP_DROPS: AtomicU64 = AtomicU64::new(0); fn profile_cache() -> &'static DashMap { PROFILE_CACHE.get_or_init(DashMap::new) } +fn profile_cache_insert_guard() -> &'static Mutex<()> { + PROFILE_CACHE_INSERT_GUARD.get_or_init(|| Mutex::new(())) +} + +fn sweep_expired_profile_cache(ttl: Duration, now: Instant) { + if ttl.is_zero() { + return; + } + profile_cache().retain(|_, value| now.saturating_duration_since(value.updated_at) <= ttl); +} + +/// Current number of adaptive TLS fetch profile-cache entries. +pub(crate) fn profile_cache_entries_for_metrics() -> usize { + profile_cache().len() +} + +/// Number of fresh profile-cache winners skipped because the cache was full. +pub(crate) fn profile_cache_cap_drops_for_metrics() -> u64 { + PROFILE_CACHE_CAP_DROPS.load(Ordering::Relaxed) +} + fn route_hint( upstream: Option<&std::sync::Arc>, unix_sock: Option<&str>, @@ -267,6 +294,43 @@ fn remember_profile_success( let Some(key) = cache_key else { return; }; + remember_profile_success_with_cap(strategy, key, profile, now, PROFILE_CACHE_MAX_ENTRIES); +} + +fn remember_profile_success_with_cap( + strategy: &TlsFetchStrategy, + key: ProfileCacheKey, + profile: TlsFetchProfile, + now: Instant, + max_entries: usize, +) { + let Ok(_guard) = profile_cache_insert_guard().lock() else { + PROFILE_CACHE_CAP_DROPS.fetch_add(1, Ordering::Relaxed); + return; + }; + if max_entries == 0 { + PROFILE_CACHE_CAP_DROPS.fetch_add(1, Ordering::Relaxed); + return; + } + if profile_cache().contains_key(&key) { + profile_cache().insert( + key, + ProfileCacheValue { + profile, + updated_at: now, + }, + ); + return; + } + if profile_cache().len() >= max_entries { + // TLS fetch is control-plane work; sweeping under a tiny mutex keeps + // profile-cache cardinality hard-bounded without touching relay hot paths. + sweep_expired_profile_cache(strategy.profile_cache_ttl, now); + } + if profile_cache().len() >= max_entries { + PROFILE_CACHE_CAP_DROPS.fetch_add(1, Ordering::Relaxed); + return; + } profile_cache().insert( key, ProfileCacheValue {