diff --git a/src/ip_tracker.rs b/src/ip_tracker.rs index b3eaa48..290bd71 100644 --- a/src/ip_tracker.rs +++ b/src/ip_tracker.rs @@ -26,6 +26,8 @@ pub struct UserIpTracker { recent_entry_count: Arc, active_cap_rejects: Arc, recent_cap_rejects: Arc, + cleanup_direct_releases: Arc, + cleanup_deferred_releases: Arc, max_ips: Arc>>, default_max_ips: Arc>, limit_mode: Arc>, @@ -52,6 +54,10 @@ pub struct UserIpTrackerMemoryStats { pub active_cap_rejects: u64, /// Number of new connections rejected by the global recent-entry cap. pub recent_cap_rejects: u64, + /// Number of release cleanups completed without queueing. + pub cleanup_direct_releases: u64, + /// Number of release cleanups deferred through the cleanup queue. + pub cleanup_deferred_releases: u64, } impl UserIpTracker { @@ -63,6 +69,8 @@ impl UserIpTracker { recent_entry_count: Arc::new(AtomicU64::new(0)), active_cap_rejects: Arc::new(AtomicU64::new(0)), recent_cap_rejects: Arc::new(AtomicU64::new(0)), + cleanup_direct_releases: Arc::new(AtomicU64::new(0)), + cleanup_deferred_releases: Arc::new(AtomicU64::new(0)), max_ips: Arc::new(RwLock::new(HashMap::new())), default_max_ips: Arc::new(RwLock::new(0)), limit_mode: Arc::new(RwLock::new(UserMaxUniqueIpsMode::ActiveWindow)), @@ -83,16 +91,67 @@ impl UserIpTracker { }); } + fn apply_active_cleanup( + active_ips: &mut HashMap>, + user: &str, + ip: IpAddr, + pending_count: usize, + ) -> usize { + if pending_count == 0 { + return 0; + } + + let mut remove_user = false; + let mut removed_active_entries = 0usize; + if let Some(user_ips) = active_ips.get_mut(user) { + if let Some(count) = user_ips.get_mut(&ip) { + if *count > pending_count { + *count -= pending_count; + } else if user_ips.remove(&ip).is_some() { + removed_active_entries = 1; + } + } + remove_user = user_ips.is_empty(); + } + if remove_user { + active_ips.remove(user); + } + removed_active_entries + } + + fn try_cleanup_active_ip(&self, user: &str, ip: IpAddr) -> bool { + let Ok(mut active_ips) = self.active_ips.try_write() else { + return false; + }; + let removed_active_entries = Self::apply_active_cleanup(&mut active_ips, user, ip, 1); + Self::decrement_counter(&self.active_entry_count, removed_active_entries); + true + } + + /// Releases an active IP reservation without waiting, falling back to deferred cleanup. + pub(crate) fn release_or_enqueue_cleanup(&self, user: String, ip: IpAddr) { + if self.try_cleanup_active_ip(&user, ip) { + self.cleanup_direct_releases.fetch_add(1, Ordering::Relaxed); + return; + } + self.enqueue_cleanup(user, ip); + } + + /// Queues a deferred active IP cleanup for a later async drain. pub fn enqueue_cleanup(&self, user: String, ip: IpAddr) { match self.cleanup_queue.lock() { Ok(mut queue) => { let count = queue.entry((user, ip)).or_insert(0); *count = count.saturating_add(1); + self.cleanup_deferred_releases + .fetch_add(1, Ordering::Relaxed); } Err(poisoned) => { let mut queue = poisoned.into_inner(); let count = queue.entry((user.clone(), ip)).or_insert(0); *count = count.saturating_add(1); + self.cleanup_deferred_releases + .fetch_add(1, Ordering::Relaxed); self.cleanup_queue.clear_poison(); tracing::warn!( "UserIpTracker cleanup_queue lock poisoned; recovered and enqueued IP cleanup for {} ({})", @@ -169,23 +228,9 @@ impl UserIpTracker { let mut active_ips = self.active_ips.write().await; let mut removed_active_entries = 0usize; for ((user, ip), pending_count) in to_remove { - if pending_count == 0 { - continue; - } - if let Some(user_ips) = active_ips.get_mut(&user) { - if let Some(count) = user_ips.get_mut(&ip) { - if *count > pending_count { - *count -= pending_count; - } else { - if user_ips.remove(&ip).is_some() { - removed_active_entries = removed_active_entries.saturating_add(1); - } - } - } - if user_ips.is_empty() { - active_ips.remove(&user); - } - } + removed_active_entries = removed_active_entries.saturating_add( + Self::apply_active_cleanup(&mut active_ips, &user, ip, pending_count), + ); } Self::decrement_counter(&self.active_entry_count, removed_active_entries); } @@ -290,6 +335,8 @@ impl UserIpTracker { cleanup_queue_len, active_cap_rejects: self.active_cap_rejects.load(Ordering::Relaxed), recent_cap_rejects: self.recent_cap_rejects.load(Ordering::Relaxed), + cleanup_direct_releases: self.cleanup_direct_releases.load(Ordering::Relaxed), + cleanup_deferred_releases: self.cleanup_deferred_releases.load(Ordering::Relaxed), } } diff --git a/src/metrics.rs b/src/metrics.rs index 75f22cf..3edf256 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -3123,6 +3123,21 @@ async fn render_metrics( "telemt_ip_tracker_cleanup_queue_len {}", ip_memory.cleanup_queue_len ); + let _ = writeln!( + out, + "# HELP telemt_ip_tracker_cleanup_total Release cleanup decisions by path" + ); + let _ = writeln!(out, "# TYPE telemt_ip_tracker_cleanup_total counter"); + let _ = writeln!( + out, + "telemt_ip_tracker_cleanup_total{{path=\"direct\"}} {}", + ip_memory.cleanup_direct_releases + ); + let _ = writeln!( + out, + "telemt_ip_tracker_cleanup_total{{path=\"deferred\"}} {}", + ip_memory.cleanup_deferred_releases + ); let _ = writeln!( out, "# HELP telemt_ip_tracker_cap_rejects_total New connection rejects caused by global IP tracker caps" @@ -3476,6 +3491,7 @@ mod tests { assert!(output.contains("# TYPE telemt_ip_tracker_users gauge")); assert!(output.contains("# TYPE telemt_ip_tracker_entries gauge")); assert!(output.contains("# TYPE telemt_ip_tracker_cleanup_queue_len gauge")); + assert!(output.contains("# TYPE telemt_ip_tracker_cleanup_total counter")); assert!(output.contains("# TYPE telemt_ip_tracker_cap_rejects_total counter")); assert!(output.contains("# TYPE telemt_tls_fetch_profile_cache_entries gauge")); assert!(output.contains("# TYPE telemt_tls_fetch_profile_cache_cap_drops_total counter")); diff --git a/src/proxy/client.rs b/src/proxy/client.rs index e28d0b6..1ce74d6 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -73,7 +73,8 @@ impl Drop for UserConnectionReservation { self.active = false; self.stats.decrement_user_curr_connects(&self.user); if self.tracks_ip { - self.ip_tracker.enqueue_cleanup(self.user.clone(), self.ip); + self.ip_tracker + .release_or_enqueue_cleanup(self.user.clone(), self.ip); } } }