Reduce Lock-free IP-Tracker Cleanup backlog

This commit is contained in:
Alexey
2026-04-30 10:51:04 +03:00
parent 1f90e28871
commit 61f9af7ffc
3 changed files with 82 additions and 18 deletions

View File

@@ -26,6 +26,8 @@ pub struct UserIpTracker {
recent_entry_count: Arc<AtomicU64>,
active_cap_rejects: Arc<AtomicU64>,
recent_cap_rejects: Arc<AtomicU64>,
cleanup_direct_releases: Arc<AtomicU64>,
cleanup_deferred_releases: Arc<AtomicU64>,
max_ips: Arc<RwLock<HashMap<String, usize>>>,
default_max_ips: Arc<RwLock<usize>>,
limit_mode: Arc<RwLock<UserMaxUniqueIpsMode>>,
@@ -52,6 +54,10 @@ pub struct UserIpTrackerMemoryStats {
pub active_cap_rejects: u64,
/// Number of new connections rejected by the global recent-entry cap.
pub recent_cap_rejects: u64,
/// Number of release cleanups completed without queueing.
pub cleanup_direct_releases: u64,
/// Number of release cleanups deferred through the cleanup queue.
pub cleanup_deferred_releases: u64,
}
impl UserIpTracker {
@@ -63,6 +69,8 @@ impl UserIpTracker {
recent_entry_count: Arc::new(AtomicU64::new(0)),
active_cap_rejects: Arc::new(AtomicU64::new(0)),
recent_cap_rejects: Arc::new(AtomicU64::new(0)),
cleanup_direct_releases: Arc::new(AtomicU64::new(0)),
cleanup_deferred_releases: Arc::new(AtomicU64::new(0)),
max_ips: Arc::new(RwLock::new(HashMap::new())),
default_max_ips: Arc::new(RwLock::new(0)),
limit_mode: Arc::new(RwLock::new(UserMaxUniqueIpsMode::ActiveWindow)),
@@ -83,16 +91,67 @@ impl UserIpTracker {
});
}
fn apply_active_cleanup(
active_ips: &mut HashMap<String, HashMap<IpAddr, usize>>,
user: &str,
ip: IpAddr,
pending_count: usize,
) -> usize {
if pending_count == 0 {
return 0;
}
let mut remove_user = false;
let mut removed_active_entries = 0usize;
if let Some(user_ips) = active_ips.get_mut(user) {
if let Some(count) = user_ips.get_mut(&ip) {
if *count > pending_count {
*count -= pending_count;
} else if user_ips.remove(&ip).is_some() {
removed_active_entries = 1;
}
}
remove_user = user_ips.is_empty();
}
if remove_user {
active_ips.remove(user);
}
removed_active_entries
}
fn try_cleanup_active_ip(&self, user: &str, ip: IpAddr) -> bool {
let Ok(mut active_ips) = self.active_ips.try_write() else {
return false;
};
let removed_active_entries = Self::apply_active_cleanup(&mut active_ips, user, ip, 1);
Self::decrement_counter(&self.active_entry_count, removed_active_entries);
true
}
/// Releases an active IP reservation without waiting, falling back to deferred cleanup.
pub(crate) fn release_or_enqueue_cleanup(&self, user: String, ip: IpAddr) {
if self.try_cleanup_active_ip(&user, ip) {
self.cleanup_direct_releases.fetch_add(1, Ordering::Relaxed);
return;
}
self.enqueue_cleanup(user, ip);
}
/// Queues a deferred active IP cleanup for a later async drain.
pub fn enqueue_cleanup(&self, user: String, ip: IpAddr) {
match self.cleanup_queue.lock() {
Ok(mut queue) => {
let count = queue.entry((user, ip)).or_insert(0);
*count = count.saturating_add(1);
self.cleanup_deferred_releases
.fetch_add(1, Ordering::Relaxed);
}
Err(poisoned) => {
let mut queue = poisoned.into_inner();
let count = queue.entry((user.clone(), ip)).or_insert(0);
*count = count.saturating_add(1);
self.cleanup_deferred_releases
.fetch_add(1, Ordering::Relaxed);
self.cleanup_queue.clear_poison();
tracing::warn!(
"UserIpTracker cleanup_queue lock poisoned; recovered and enqueued IP cleanup for {} ({})",
@@ -169,23 +228,9 @@ impl UserIpTracker {
let mut active_ips = self.active_ips.write().await;
let mut removed_active_entries = 0usize;
for ((user, ip), pending_count) in to_remove {
if pending_count == 0 {
continue;
}
if let Some(user_ips) = active_ips.get_mut(&user) {
if let Some(count) = user_ips.get_mut(&ip) {
if *count > pending_count {
*count -= pending_count;
} else {
if user_ips.remove(&ip).is_some() {
removed_active_entries = removed_active_entries.saturating_add(1);
}
}
}
if user_ips.is_empty() {
active_ips.remove(&user);
}
}
removed_active_entries = removed_active_entries.saturating_add(
Self::apply_active_cleanup(&mut active_ips, &user, ip, pending_count),
);
}
Self::decrement_counter(&self.active_entry_count, removed_active_entries);
}
@@ -290,6 +335,8 @@ impl UserIpTracker {
cleanup_queue_len,
active_cap_rejects: self.active_cap_rejects.load(Ordering::Relaxed),
recent_cap_rejects: self.recent_cap_rejects.load(Ordering::Relaxed),
cleanup_direct_releases: self.cleanup_direct_releases.load(Ordering::Relaxed),
cleanup_deferred_releases: self.cleanup_deferred_releases.load(Ordering::Relaxed),
}
}

View File

@@ -3123,6 +3123,21 @@ async fn render_metrics(
"telemt_ip_tracker_cleanup_queue_len {}",
ip_memory.cleanup_queue_len
);
let _ = writeln!(
out,
"# HELP telemt_ip_tracker_cleanup_total Release cleanup decisions by path"
);
let _ = writeln!(out, "# TYPE telemt_ip_tracker_cleanup_total counter");
let _ = writeln!(
out,
"telemt_ip_tracker_cleanup_total{{path=\"direct\"}} {}",
ip_memory.cleanup_direct_releases
);
let _ = writeln!(
out,
"telemt_ip_tracker_cleanup_total{{path=\"deferred\"}} {}",
ip_memory.cleanup_deferred_releases
);
let _ = writeln!(
out,
"# HELP telemt_ip_tracker_cap_rejects_total New connection rejects caused by global IP tracker caps"
@@ -3476,6 +3491,7 @@ mod tests {
assert!(output.contains("# TYPE telemt_ip_tracker_users gauge"));
assert!(output.contains("# TYPE telemt_ip_tracker_entries gauge"));
assert!(output.contains("# TYPE telemt_ip_tracker_cleanup_queue_len gauge"));
assert!(output.contains("# TYPE telemt_ip_tracker_cleanup_total counter"));
assert!(output.contains("# TYPE telemt_ip_tracker_cap_rejects_total counter"));
assert!(output.contains("# TYPE telemt_tls_fetch_profile_cache_entries gauge"));
assert!(output.contains("# TYPE telemt_tls_fetch_profile_cache_cap_drops_total counter"));

View File

@@ -73,7 +73,8 @@ impl Drop for UserConnectionReservation {
self.active = false;
self.stats.decrement_user_curr_connects(&self.user);
if self.tracks_ip {
self.ip_tracker.enqueue_cleanup(self.user.clone(), self.ip);
self.ip_tracker
.release_or_enqueue_cleanup(self.user.clone(), self.ip);
}
}
}