Cap scanner-sensitive Caches and IP-Tracker Cardinality

This commit is contained in:
Alexey
2026-04-30 10:43:27 +03:00
parent 876b74ebf7
commit 1f90e28871
4 changed files with 265 additions and 14 deletions

View File

@@ -14,11 +14,18 @@ use tokio::sync::{Mutex as AsyncMutex, RwLock, RwLockWriteGuard};
use crate::config::UserMaxUniqueIpsMode;
const CLEANUP_DRAIN_BATCH_LIMIT: usize = 1024;
const MAX_ACTIVE_IP_ENTRIES: u64 = 131_072;
const MAX_RECENT_IP_ENTRIES: u64 = 262_144;
/// Tracks active and recent client IPs for per-user admission control.
#[derive(Debug, Clone)]
pub struct UserIpTracker {
active_ips: Arc<RwLock<HashMap<String, HashMap<IpAddr, usize>>>>,
recent_ips: Arc<RwLock<HashMap<String, HashMap<IpAddr, Instant>>>>,
active_entry_count: Arc<AtomicU64>,
recent_entry_count: Arc<AtomicU64>,
active_cap_rejects: Arc<AtomicU64>,
recent_cap_rejects: Arc<AtomicU64>,
max_ips: Arc<RwLock<HashMap<String, usize>>>,
default_max_ips: Arc<RwLock<usize>>,
limit_mode: Arc<RwLock<UserMaxUniqueIpsMode>>,
@@ -28,13 +35,23 @@ pub struct UserIpTracker {
cleanup_drain_lock: Arc<AsyncMutex<()>>,
}
/// Point-in-time memory counters for user/IP limiter state.
#[derive(Debug, Clone, Copy)]
pub struct UserIpTrackerMemoryStats {
/// Number of users with active IP state.
pub active_users: usize,
/// Number of users with recent IP state.
pub recent_users: usize,
/// Number of active `(user, ip)` entries.
pub active_entries: usize,
/// Number of recent-window `(user, ip)` entries.
pub recent_entries: usize,
/// Number of deferred disconnect cleanups waiting to be drained.
pub cleanup_queue_len: usize,
/// Number of new connections rejected by the global active-entry cap.
pub active_cap_rejects: u64,
/// Number of new connections rejected by the global recent-entry cap.
pub recent_cap_rejects: u64,
}
impl UserIpTracker {
@@ -42,6 +59,10 @@ impl UserIpTracker {
Self {
active_ips: Arc::new(RwLock::new(HashMap::new())),
recent_ips: Arc::new(RwLock::new(HashMap::new())),
active_entry_count: Arc::new(AtomicU64::new(0)),
recent_entry_count: Arc::new(AtomicU64::new(0)),
active_cap_rejects: Arc::new(AtomicU64::new(0)),
recent_cap_rejects: Arc::new(AtomicU64::new(0)),
max_ips: Arc::new(RwLock::new(HashMap::new())),
default_max_ips: Arc::new(RwLock::new(0)),
limit_mode: Arc::new(RwLock::new(UserMaxUniqueIpsMode::ActiveWindow)),
@@ -52,6 +73,16 @@ impl UserIpTracker {
}
}
fn decrement_counter(counter: &AtomicU64, amount: usize) {
if amount == 0 {
return;
}
let amount = amount as u64;
let _ = counter.fetch_update(Ordering::AcqRel, Ordering::Relaxed, |current| {
Some(current.saturating_sub(amount))
});
}
pub fn enqueue_cleanup(&self, user: String, ip: IpAddr) {
match self.cleanup_queue.lock() {
Ok(mut queue) => {
@@ -136,6 +167,7 @@ impl UserIpTracker {
}
let mut active_ips = self.active_ips.write().await;
let mut removed_active_entries = 0usize;
for ((user, ip), pending_count) in to_remove {
if pending_count == 0 {
continue;
@@ -145,7 +177,9 @@ impl UserIpTracker {
if *count > pending_count {
*count -= pending_count;
} else {
user_ips.remove(&ip);
if user_ips.remove(&ip).is_some() {
removed_active_entries = removed_active_entries.saturating_add(1);
}
}
}
if user_ips.is_empty() {
@@ -153,6 +187,7 @@ impl UserIpTracker {
}
}
}
Self::decrement_counter(&self.active_entry_count, removed_active_entries);
}
fn now_epoch_secs() -> u64 {
@@ -204,9 +239,12 @@ impl UserIpTracker {
let now = Instant::now();
let (mut active_ips, mut recent_ips) = self.active_and_recent_write().await;
let mut pruned_recent_entries = 0usize;
for user_recent in recent_ips.values_mut() {
Self::prune_recent(user_recent, now, window);
pruned_recent_entries =
pruned_recent_entries.saturating_add(Self::prune_recent(user_recent, now, window));
}
Self::decrement_counter(&self.recent_entry_count, pruned_recent_entries);
let mut users =
Vec::<String>::with_capacity(active_ips.len().saturating_add(recent_ips.len()));
@@ -250,6 +288,8 @@ impl UserIpTracker {
active_entries,
recent_entries,
cleanup_queue_len,
active_cap_rejects: self.active_cap_rejects.load(Ordering::Relaxed),
recent_cap_rejects: self.recent_cap_rejects.load(Ordering::Relaxed),
}
}
@@ -280,11 +320,17 @@ impl UserIpTracker {
max_ips.clone_from(limits);
}
fn prune_recent(user_recent: &mut HashMap<IpAddr, Instant>, now: Instant, window: Duration) {
fn prune_recent(
user_recent: &mut HashMap<IpAddr, Instant>,
now: Instant,
window: Duration,
) -> usize {
if user_recent.is_empty() {
return;
return 0;
}
let before = user_recent.len();
user_recent.retain(|_, seen_at| now.duration_since(*seen_at) <= window);
before.saturating_sub(user_recent.len())
}
pub async fn check_and_add(&self, username: &str, ip: IpAddr) -> Result<(), String> {
@@ -310,15 +356,29 @@ impl UserIpTracker {
let user_recent = recent_ips
.entry(username.to_string())
.or_insert_with(HashMap::new);
Self::prune_recent(user_recent, now, window);
let pruned_recent_entries = Self::prune_recent(user_recent, now, window);
Self::decrement_counter(&self.recent_entry_count, pruned_recent_entries);
let recent_contains_ip = user_recent.contains_key(&ip);
if let Some(count) = user_active.get_mut(&ip) {
if !recent_contains_ip
&& self.recent_entry_count.load(Ordering::Relaxed) >= MAX_RECENT_IP_ENTRIES
{
self.recent_cap_rejects.fetch_add(1, Ordering::Relaxed);
return Err(format!(
"IP tracker recent entry cap reached: entries={}/{}",
self.recent_entry_count.load(Ordering::Relaxed),
MAX_RECENT_IP_ENTRIES
));
}
*count = count.saturating_add(1);
user_recent.insert(ip, now);
if user_recent.insert(ip, now).is_none() {
self.recent_entry_count.fetch_add(1, Ordering::Relaxed);
}
return Ok(());
}
let is_new_ip = !user_recent.contains_key(&ip);
let is_new_ip = !recent_contains_ip;
if let Some(limit) = limit {
let active_limit_reached = user_active.len() >= limit;
@@ -342,26 +402,51 @@ impl UserIpTracker {
}
}
user_active.insert(ip, 1);
user_recent.insert(ip, now);
if self.active_entry_count.load(Ordering::Relaxed) >= MAX_ACTIVE_IP_ENTRIES {
self.active_cap_rejects.fetch_add(1, Ordering::Relaxed);
return Err(format!(
"IP tracker active entry cap reached: entries={}/{}",
self.active_entry_count.load(Ordering::Relaxed),
MAX_ACTIVE_IP_ENTRIES
));
}
if is_new_ip && self.recent_entry_count.load(Ordering::Relaxed) >= MAX_RECENT_IP_ENTRIES {
self.recent_cap_rejects.fetch_add(1, Ordering::Relaxed);
return Err(format!(
"IP tracker recent entry cap reached: entries={}/{}",
self.recent_entry_count.load(Ordering::Relaxed),
MAX_RECENT_IP_ENTRIES
));
}
if user_active.insert(ip, 1).is_none() {
self.active_entry_count.fetch_add(1, Ordering::Relaxed);
}
if user_recent.insert(ip, now).is_none() {
self.recent_entry_count.fetch_add(1, Ordering::Relaxed);
}
Ok(())
}
pub async fn remove_ip(&self, username: &str, ip: IpAddr) {
self.maybe_compact_empty_users().await;
let mut active_ips = self.active_ips.write().await;
let mut removed_active_entries = 0usize;
if let Some(user_ips) = active_ips.get_mut(username) {
if let Some(count) = user_ips.get_mut(&ip) {
if *count > 1 {
*count -= 1;
} else {
user_ips.remove(&ip);
if user_ips.remove(&ip).is_some() {
removed_active_entries = 1;
}
}
}
if user_ips.is_empty() {
active_ips.remove(username);
}
}
Self::decrement_counter(&self.active_entry_count, removed_active_entries);
}
pub async fn get_recent_counts_for_users(&self, users: &[String]) -> HashMap<String, usize> {
@@ -478,20 +563,27 @@ impl UserIpTracker {
pub async fn clear_user_ips(&self, username: &str) {
let mut active_ips = self.active_ips.write().await;
active_ips.remove(username);
let removed_active_entries = active_ips
.remove(username)
.map(|ips| ips.len())
.unwrap_or(0);
drop(active_ips);
Self::decrement_counter(&self.active_entry_count, removed_active_entries);
let mut recent_ips = self.recent_ips.write().await;
recent_ips.remove(username);
let removed_recent_entries = recent_ips.remove(username).map(|ips| ips.len()).unwrap_or(0);
Self::decrement_counter(&self.recent_entry_count, removed_recent_entries);
}
pub async fn clear_all(&self) {
let mut active_ips = self.active_ips.write().await;
active_ips.clear();
drop(active_ips);
self.active_entry_count.store(0, Ordering::Relaxed);
let mut recent_ips = self.recent_ips.write().await;
recent_ips.clear();
self.recent_entry_count.store(0, Ordering::Relaxed);
}
pub async fn is_ip_active(&self, username: &str, ip: IpAddr) -> bool {