From 565b4ee923be5821f1c69624d5316c5423174d57 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 5 Mar 2026 13:21:11 +0300 Subject: [PATCH 1/3] Unique IP always in Metrics+API Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/api/model.rs | 1 + src/api/users.rs | 7 +++-- src/ip_tracker.rs | 77 ++++++++++++++++++++++++----------------------- src/metrics.rs | 21 ++++++++++++- 4 files changed, 66 insertions(+), 40 deletions(-) diff --git a/src/api/model.rs b/src/api/model.rs index efe8ebb..09eebdc 100644 --- a/src/api/model.rs +++ b/src/api/model.rs @@ -369,6 +369,7 @@ pub(super) struct UserInfo { pub(super) max_unique_ips: Option, pub(super) current_connections: u64, pub(super) active_unique_ips: usize, + pub(super) recent_unique_ips: usize, pub(super) total_octets: u64, pub(super) links: UserLinks, } diff --git a/src/api/users.rs b/src/api/users.rs index 8e90c7f..32823f1 100644 --- a/src/api/users.rs +++ b/src/api/users.rs @@ -112,6 +112,7 @@ pub(super) async fn create_user( max_unique_ips: updated_limit, current_connections: 0, active_unique_ips: 0, + recent_unique_ips: 0, total_octets: 0, links: build_user_links( &cfg, @@ -300,7 +301,7 @@ pub(super) async fn users_from_config( startup_detected_ip_v4: Option, startup_detected_ip_v6: Option, ) -> Vec { - let ip_counts = ip_tracker + let active_ip_counts = ip_tracker .get_stats() .await .into_iter() @@ -309,6 +310,7 @@ pub(super) async fn users_from_config( let mut names = cfg.access.users.keys().cloned().collect::>(); names.sort(); + let recent_ip_counts = ip_tracker.get_recent_counts_for_users(&names).await; let mut users = Vec::with_capacity(names.len()); for username in names { @@ -340,7 +342,8 @@ pub(super) async fn users_from_config( data_quota_bytes: cfg.access.user_data_quota.get(&username).copied(), max_unique_ips: cfg.access.user_max_unique_ips.get(&username).copied(), current_connections: stats.get_user_curr_connects(&username), - active_unique_ips: ip_counts.get(&username).copied().unwrap_or(0), + active_unique_ips: active_ip_counts.get(&username).copied().unwrap_or(0), + recent_unique_ips: recent_ip_counts.get(&username).copied().unwrap_or(0), total_octets: stats.get_user_total_octets(&username), links, username, diff --git a/src/ip_tracker.rs b/src/ip_tracker.rs index 626d591..3ff6042 100644 --- a/src/ip_tracker.rs +++ b/src/ip_tracker.rs @@ -67,22 +67,15 @@ impl UserIpTracker { let max_ips = self.max_ips.read().await; max_ips.get(username).copied() }; + let mode = *self.limit_mode.read().await; + let window = *self.limit_window.read().await; + let now = Instant::now(); let mut active_ips = self.active_ips.write().await; let user_active = active_ips .entry(username.to_string()) .or_insert_with(HashSet::new); - if limit.is_none() { - user_active.insert(ip); - return Ok(()); - } - - let limit = limit.unwrap_or_default(); - let mode = *self.limit_mode.read().await; - let window = *self.limit_window.read().await; - let now = Instant::now(); - let mut recent_ips = self.recent_ips.write().await; let user_recent = recent_ips .entry(username.to_string()) @@ -94,24 +87,26 @@ impl UserIpTracker { return Ok(()); } - let active_limit_reached = user_active.len() >= limit; - let recent_limit_reached = user_recent.len() >= limit; - let deny = match mode { - UserMaxUniqueIpsMode::ActiveWindow => active_limit_reached, - UserMaxUniqueIpsMode::TimeWindow => recent_limit_reached, - UserMaxUniqueIpsMode::Combined => active_limit_reached || recent_limit_reached, - }; + if let Some(limit) = limit { + let active_limit_reached = user_active.len() >= limit; + let recent_limit_reached = user_recent.len() >= limit; + let deny = match mode { + UserMaxUniqueIpsMode::ActiveWindow => active_limit_reached, + UserMaxUniqueIpsMode::TimeWindow => recent_limit_reached, + UserMaxUniqueIpsMode::Combined => active_limit_reached || recent_limit_reached, + }; - if deny { - return Err(format!( - "IP limit reached for user '{}': active={}/{} recent={}/{} mode={:?}", - username, - user_active.len(), - limit, - user_recent.len(), - limit, - mode - )); + if deny { + return Err(format!( + "IP limit reached for user '{}': active={}/{} recent={}/{} mode={:?}", + username, + user_active.len(), + limit, + user_recent.len(), + limit, + mode + )); + } } user_active.insert(ip); @@ -127,18 +122,26 @@ impl UserIpTracker { active_ips.remove(username); } } - drop(active_ips); + } - let mode = *self.limit_mode.read().await; - if matches!(mode, UserMaxUniqueIpsMode::ActiveWindow) { - let mut recent_ips = self.recent_ips.write().await; - if let Some(user_recent) = recent_ips.get_mut(username) { - user_recent.remove(&ip); - if user_recent.is_empty() { - recent_ips.remove(username); - } - } + pub async fn get_recent_counts_for_users(&self, users: &[String]) -> HashMap { + let window = *self.limit_window.read().await; + let now = Instant::now(); + let mut recent_ips = self.recent_ips.write().await; + + let mut counts = HashMap::with_capacity(users.len()); + for user in users { + let count = if let Some(user_recent) = recent_ips.get_mut(user) { + Self::prune_recent(user_recent, now, window); + user_recent.len() + } else { + 0 + }; + counts.insert(user.clone(), count); } + + recent_ips.retain(|_, user_recent| !user_recent.is_empty()); + counts } pub async fn get_active_ip_count(&self, username: &str) -> usize { diff --git a/src/metrics.rs b/src/metrics.rs index eae69d1..1595445 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1267,11 +1267,21 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp .collect(); let mut unique_users = BTreeSet::new(); + unique_users.extend(config.access.users.keys().cloned()); unique_users.extend(config.access.user_max_unique_ips.keys().cloned()); unique_users.extend(ip_counts.keys().cloned()); + let unique_users_vec: Vec = unique_users.iter().cloned().collect(); + let recent_counts = ip_tracker + .get_recent_counts_for_users(&unique_users_vec) + .await; let _ = writeln!(out, "# HELP telemt_user_unique_ips_current Per-user current number of unique active IPs"); let _ = writeln!(out, "# TYPE telemt_user_unique_ips_current gauge"); + let _ = writeln!( + out, + "# HELP telemt_user_unique_ips_recent_window Per-user unique IPs seen in configured observation window" + ); + let _ = writeln!(out, "# TYPE telemt_user_unique_ips_recent_window gauge"); let _ = writeln!(out, "# HELP telemt_user_unique_ips_limit Per-user configured unique IP limit (0 means unlimited)"); let _ = writeln!(out, "# TYPE telemt_user_unique_ips_limit gauge"); let _ = writeln!(out, "# HELP telemt_user_unique_ips_utilization Per-user unique IP usage ratio (0 for unlimited)"); @@ -1286,6 +1296,12 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp 0.0 }; let _ = writeln!(out, "telemt_user_unique_ips_current{{user=\"{}\"}} {}", user, current); + let _ = writeln!( + out, + "telemt_user_unique_ips_recent_window{{user=\"{}\"}} {}", + user, + recent_counts.get(&user).copied().unwrap_or(0) + ); let _ = writeln!(out, "telemt_user_unique_ips_limit{{user=\"{}\"}} {}", user, limit); let _ = writeln!( out, @@ -1378,6 +1394,7 @@ mod tests { assert!(output.contains("telemt_user_msgs_from_client{user=\"alice\"} 1")); assert!(output.contains("telemt_user_msgs_to_client{user=\"alice\"} 2")); assert!(output.contains("telemt_user_unique_ips_current{user=\"alice\"} 1")); + assert!(output.contains("telemt_user_unique_ips_recent_window{user=\"alice\"} 1")); assert!(output.contains("telemt_user_unique_ips_limit{user=\"alice\"} 4")); assert!(output.contains("telemt_user_unique_ips_utilization{user=\"alice\"} 0.250000")); } @@ -1391,7 +1408,8 @@ mod tests { assert!(output.contains("telemt_connections_total 0")); assert!(output.contains("telemt_connections_bad_total 0")); assert!(output.contains("telemt_handshake_timeouts_total 0")); - assert!(!output.contains("user=")); + assert!(output.contains("telemt_user_unique_ips_current{user=")); + assert!(output.contains("telemt_user_unique_ips_recent_window{user=")); } #[tokio::test] @@ -1412,6 +1430,7 @@ mod tests { "# TYPE telemt_me_writer_removed_unexpected_minus_restored_total gauge" )); assert!(output.contains("# TYPE telemt_user_unique_ips_current gauge")); + assert!(output.contains("# TYPE telemt_user_unique_ips_recent_window gauge")); assert!(output.contains("# TYPE telemt_user_unique_ips_limit gauge")); assert!(output.contains("# TYPE telemt_user_unique_ips_utilization gauge")); } From 0b1a8cd3f82dec0b20c01b2fbdd24244264ec0b2 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 5 Mar 2026 13:41:41 +0300 Subject: [PATCH 2/3] IP Limit fixes --- src/api/model.rs | 4 ++ src/api/users.rs | 27 +++++++------ src/ip_tracker.rs | 96 +++++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 108 insertions(+), 19 deletions(-) diff --git a/src/api/model.rs b/src/api/model.rs index 09eebdc..2f6c58e 100644 --- a/src/api/model.rs +++ b/src/api/model.rs @@ -1,3 +1,5 @@ +use std::net::IpAddr; + use chrono::{DateTime, Utc}; use hyper::StatusCode; use rand::Rng; @@ -369,7 +371,9 @@ pub(super) struct UserInfo { pub(super) max_unique_ips: Option, pub(super) current_connections: u64, pub(super) active_unique_ips: usize, + pub(super) active_unique_ips_list: Vec, pub(super) recent_unique_ips: usize, + pub(super) recent_unique_ips_list: Vec, pub(super) total_octets: u64, pub(super) links: UserLinks, } diff --git a/src/api/users.rs b/src/api/users.rs index 32823f1..d156896 100644 --- a/src/api/users.rs +++ b/src/api/users.rs @@ -1,4 +1,3 @@ -use std::collections::HashMap; use std::net::IpAddr; use hyper::StatusCode; @@ -112,7 +111,9 @@ pub(super) async fn create_user( max_unique_ips: updated_limit, current_connections: 0, active_unique_ips: 0, + active_unique_ips_list: Vec::new(), recent_unique_ips: 0, + recent_unique_ips_list: Vec::new(), total_octets: 0, links: build_user_links( &cfg, @@ -301,19 +302,21 @@ pub(super) async fn users_from_config( startup_detected_ip_v4: Option, startup_detected_ip_v6: Option, ) -> Vec { - let active_ip_counts = ip_tracker - .get_stats() - .await - .into_iter() - .map(|(user, count, _)| (user, count)) - .collect::>(); - let mut names = cfg.access.users.keys().cloned().collect::>(); names.sort(); - let recent_ip_counts = ip_tracker.get_recent_counts_for_users(&names).await; + let active_ip_lists = ip_tracker.get_active_ips_for_users(&names).await; + let recent_ip_lists = ip_tracker.get_recent_ips_for_users(&names).await; let mut users = Vec::with_capacity(names.len()); for username in names { + let active_ip_list = active_ip_lists + .get(&username) + .cloned() + .unwrap_or_else(Vec::new); + let recent_ip_list = recent_ip_lists + .get(&username) + .cloned() + .unwrap_or_else(Vec::new); let links = cfg .access .users @@ -342,8 +345,10 @@ pub(super) async fn users_from_config( data_quota_bytes: cfg.access.user_data_quota.get(&username).copied(), max_unique_ips: cfg.access.user_max_unique_ips.get(&username).copied(), current_connections: stats.get_user_curr_connects(&username), - active_unique_ips: active_ip_counts.get(&username).copied().unwrap_or(0), - recent_unique_ips: recent_ip_counts.get(&username).copied().unwrap_or(0), + active_unique_ips: active_ip_list.len(), + active_unique_ips_list: active_ip_list, + recent_unique_ips: recent_ip_list.len(), + recent_unique_ips_list: recent_ip_list, total_octets: stats.get_user_total_octets(&username), links, username, diff --git a/src/ip_tracker.rs b/src/ip_tracker.rs index 3ff6042..492d642 100644 --- a/src/ip_tracker.rs +++ b/src/ip_tracker.rs @@ -2,7 +2,7 @@ #![allow(dead_code)] -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::net::IpAddr; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -13,7 +13,7 @@ use crate::config::UserMaxUniqueIpsMode; #[derive(Debug, Clone)] pub struct UserIpTracker { - active_ips: Arc>>>, + active_ips: Arc>>>, recent_ips: Arc>>>, max_ips: Arc>>, limit_mode: Arc>, @@ -74,7 +74,7 @@ impl UserIpTracker { let mut active_ips = self.active_ips.write().await; let user_active = active_ips .entry(username.to_string()) - .or_insert_with(HashSet::new); + .or_insert_with(HashMap::new); let mut recent_ips = self.recent_ips.write().await; let user_recent = recent_ips @@ -82,7 +82,8 @@ impl UserIpTracker { .or_insert_with(HashMap::new); Self::prune_recent(user_recent, now, window); - if user_active.contains(&ip) { + if let Some(count) = user_active.get_mut(&ip) { + *count = count.saturating_add(1); user_recent.insert(ip, now); return Ok(()); } @@ -109,7 +110,7 @@ impl UserIpTracker { } } - user_active.insert(ip); + user_active.insert(ip, 1); user_recent.insert(ip, now); Ok(()) } @@ -117,7 +118,13 @@ impl UserIpTracker { pub async fn remove_ip(&self, username: &str, ip: IpAddr) { let mut active_ips = self.active_ips.write().await; if let Some(user_ips) = active_ips.get_mut(username) { - user_ips.remove(&ip); + if let Some(count) = user_ips.get_mut(&ip) { + if *count > 1 { + *count -= 1; + } else { + user_ips.remove(&ip); + } + } if user_ips.is_empty() { active_ips.remove(username); } @@ -144,6 +151,41 @@ impl UserIpTracker { counts } + pub async fn get_active_ips_for_users(&self, users: &[String]) -> HashMap> { + let active_ips = self.active_ips.read().await; + let mut out = HashMap::with_capacity(users.len()); + for user in users { + let mut ips = active_ips + .get(user) + .map(|per_ip| per_ip.keys().copied().collect::>()) + .unwrap_or_else(Vec::new); + ips.sort(); + out.insert(user.clone(), ips); + } + out + } + + pub async fn get_recent_ips_for_users(&self, users: &[String]) -> HashMap> { + let window = *self.limit_window.read().await; + let now = Instant::now(); + let mut recent_ips = self.recent_ips.write().await; + + let mut out = HashMap::with_capacity(users.len()); + for user in users { + let mut ips = if let Some(user_recent) = recent_ips.get_mut(user) { + Self::prune_recent(user_recent, now, window); + user_recent.keys().copied().collect::>() + } else { + Vec::new() + }; + ips.sort(); + out.insert(user.clone(), ips); + } + + recent_ips.retain(|_, user_recent| !user_recent.is_empty()); + out + } + pub async fn get_active_ip_count(&self, username: &str) -> usize { let active_ips = self.active_ips.read().await; active_ips.get(username).map(|ips| ips.len()).unwrap_or(0) @@ -153,7 +195,7 @@ impl UserIpTracker { let active_ips = self.active_ips.read().await; active_ips .get(username) - .map(|ips| ips.iter().copied().collect()) + .map(|ips| ips.keys().copied().collect()) .unwrap_or_else(Vec::new) } @@ -193,7 +235,7 @@ impl UserIpTracker { let active_ips = self.active_ips.read().await; active_ips .get(username) - .map(|ips| ips.contains(&ip)) + .map(|ips| ips.contains_key(&ip)) .unwrap_or(false) } @@ -269,6 +311,26 @@ mod tests { assert_eq!(tracker.get_active_ip_count("test_user").await, 2); } + #[tokio::test] + async fn test_active_window_rejects_new_ip_and_keeps_existing_session() { + let tracker = UserIpTracker::new(); + tracker.set_user_limit("test_user", 1).await; + tracker + .set_limit_policy(UserMaxUniqueIpsMode::ActiveWindow, 30) + .await; + + let ip1 = test_ipv4(10, 10, 10, 1); + let ip2 = test_ipv4(10, 10, 10, 2); + + assert!(tracker.check_and_add("test_user", ip1).await.is_ok()); + assert!(tracker.is_ip_active("test_user", ip1).await); + assert!(tracker.check_and_add("test_user", ip2).await.is_err()); + + // Existing session remains active; only new unique IP is denied. + assert!(tracker.is_ip_active("test_user", ip1).await); + assert_eq!(tracker.get_active_ip_count("test_user").await, 1); + } + #[tokio::test] async fn test_reconnection_from_same_ip() { let tracker = UserIpTracker::new(); @@ -281,6 +343,24 @@ mod tests { assert_eq!(tracker.get_active_ip_count("test_user").await, 1); } + #[tokio::test] + async fn test_same_ip_disconnect_keeps_active_while_other_session_alive() { + let tracker = UserIpTracker::new(); + tracker.set_user_limit("test_user", 2).await; + + let ip1 = test_ipv4(192, 168, 1, 1); + + assert!(tracker.check_and_add("test_user", ip1).await.is_ok()); + assert!(tracker.check_and_add("test_user", ip1).await.is_ok()); + assert_eq!(tracker.get_active_ip_count("test_user").await, 1); + + tracker.remove_ip("test_user", ip1).await; + assert_eq!(tracker.get_active_ip_count("test_user").await, 1); + + tracker.remove_ip("test_user", ip1).await; + assert_eq!(tracker.get_active_ip_count("test_user").await, 0); + } + #[tokio::test] async fn test_ip_removal() { let tracker = UserIpTracker::new(); From 83cadc0bf348aeaebc06322af8b7a092c4984826 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 5 Mar 2026 13:52:27 +0300 Subject: [PATCH 3/3] No lock-contention in ip-tracker --- src/ip_tracker.rs | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/src/ip_tracker.rs b/src/ip_tracker.rs index 492d642..5da8222 100644 --- a/src/ip_tracker.rs +++ b/src/ip_tracker.rs @@ -134,20 +134,20 @@ impl UserIpTracker { pub async fn get_recent_counts_for_users(&self, users: &[String]) -> HashMap { let window = *self.limit_window.read().await; let now = Instant::now(); - let mut recent_ips = self.recent_ips.write().await; + let recent_ips = self.recent_ips.read().await; let mut counts = HashMap::with_capacity(users.len()); for user in users { - let count = if let Some(user_recent) = recent_ips.get_mut(user) { - Self::prune_recent(user_recent, now, window); - user_recent.len() + let count = if let Some(user_recent) = recent_ips.get(user) { + user_recent + .values() + .filter(|seen_at| now.duration_since(**seen_at) <= window) + .count() } else { 0 }; counts.insert(user.clone(), count); } - - recent_ips.retain(|_, user_recent| !user_recent.is_empty()); counts } @@ -168,21 +168,22 @@ impl UserIpTracker { pub async fn get_recent_ips_for_users(&self, users: &[String]) -> HashMap> { let window = *self.limit_window.read().await; let now = Instant::now(); - let mut recent_ips = self.recent_ips.write().await; + let recent_ips = self.recent_ips.read().await; let mut out = HashMap::with_capacity(users.len()); for user in users { - let mut ips = if let Some(user_recent) = recent_ips.get_mut(user) { - Self::prune_recent(user_recent, now, window); - user_recent.keys().copied().collect::>() + let mut ips = if let Some(user_recent) = recent_ips.get(user) { + user_recent + .iter() + .filter(|(_, seen_at)| now.duration_since(**seen_at) <= window) + .map(|(ip, _)| *ip) + .collect::>() } else { Vec::new() }; ips.sort(); out.insert(user.clone(), ips); } - - recent_ips.retain(|_, user_recent| !user_recent.is_empty()); out }