Unique IP always in Metrics+API

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey 2026-03-05 13:21:11 +03:00
parent 7a9c1e79c2
commit 565b4ee923
No known key found for this signature in database
4 changed files with 66 additions and 40 deletions

View File

@ -369,6 +369,7 @@ pub(super) struct UserInfo {
pub(super) max_unique_ips: Option<usize>, pub(super) max_unique_ips: Option<usize>,
pub(super) current_connections: u64, pub(super) current_connections: u64,
pub(super) active_unique_ips: usize, pub(super) active_unique_ips: usize,
pub(super) recent_unique_ips: usize,
pub(super) total_octets: u64, pub(super) total_octets: u64,
pub(super) links: UserLinks, pub(super) links: UserLinks,
} }

View File

@ -112,6 +112,7 @@ pub(super) async fn create_user(
max_unique_ips: updated_limit, max_unique_ips: updated_limit,
current_connections: 0, current_connections: 0,
active_unique_ips: 0, active_unique_ips: 0,
recent_unique_ips: 0,
total_octets: 0, total_octets: 0,
links: build_user_links( links: build_user_links(
&cfg, &cfg,
@ -300,7 +301,7 @@ pub(super) async fn users_from_config(
startup_detected_ip_v4: Option<IpAddr>, startup_detected_ip_v4: Option<IpAddr>,
startup_detected_ip_v6: Option<IpAddr>, startup_detected_ip_v6: Option<IpAddr>,
) -> Vec<UserInfo> { ) -> Vec<UserInfo> {
let ip_counts = ip_tracker let active_ip_counts = ip_tracker
.get_stats() .get_stats()
.await .await
.into_iter() .into_iter()
@ -309,6 +310,7 @@ pub(super) async fn users_from_config(
let mut names = cfg.access.users.keys().cloned().collect::<Vec<_>>(); let mut names = cfg.access.users.keys().cloned().collect::<Vec<_>>();
names.sort(); names.sort();
let recent_ip_counts = ip_tracker.get_recent_counts_for_users(&names).await;
let mut users = Vec::with_capacity(names.len()); let mut users = Vec::with_capacity(names.len());
for username in names { for username in names {
@ -340,7 +342,8 @@ pub(super) async fn users_from_config(
data_quota_bytes: cfg.access.user_data_quota.get(&username).copied(), data_quota_bytes: cfg.access.user_data_quota.get(&username).copied(),
max_unique_ips: cfg.access.user_max_unique_ips.get(&username).copied(), max_unique_ips: cfg.access.user_max_unique_ips.get(&username).copied(),
current_connections: stats.get_user_curr_connects(&username), current_connections: stats.get_user_curr_connects(&username),
active_unique_ips: ip_counts.get(&username).copied().unwrap_or(0), active_unique_ips: active_ip_counts.get(&username).copied().unwrap_or(0),
recent_unique_ips: recent_ip_counts.get(&username).copied().unwrap_or(0),
total_octets: stats.get_user_total_octets(&username), total_octets: stats.get_user_total_octets(&username),
links, links,
username, username,

View File

@ -67,22 +67,15 @@ impl UserIpTracker {
let max_ips = self.max_ips.read().await; let max_ips = self.max_ips.read().await;
max_ips.get(username).copied() max_ips.get(username).copied()
}; };
let mode = *self.limit_mode.read().await;
let window = *self.limit_window.read().await;
let now = Instant::now();
let mut active_ips = self.active_ips.write().await; let mut active_ips = self.active_ips.write().await;
let user_active = active_ips let user_active = active_ips
.entry(username.to_string()) .entry(username.to_string())
.or_insert_with(HashSet::new); .or_insert_with(HashSet::new);
if limit.is_none() {
user_active.insert(ip);
return Ok(());
}
let limit = limit.unwrap_or_default();
let mode = *self.limit_mode.read().await;
let window = *self.limit_window.read().await;
let now = Instant::now();
let mut recent_ips = self.recent_ips.write().await; let mut recent_ips = self.recent_ips.write().await;
let user_recent = recent_ips let user_recent = recent_ips
.entry(username.to_string()) .entry(username.to_string())
@ -94,6 +87,7 @@ impl UserIpTracker {
return Ok(()); return Ok(());
} }
if let Some(limit) = limit {
let active_limit_reached = user_active.len() >= limit; let active_limit_reached = user_active.len() >= limit;
let recent_limit_reached = user_recent.len() >= limit; let recent_limit_reached = user_recent.len() >= limit;
let deny = match mode { let deny = match mode {
@ -113,6 +107,7 @@ impl UserIpTracker {
mode mode
)); ));
} }
}
user_active.insert(ip); user_active.insert(ip);
user_recent.insert(ip, now); user_recent.insert(ip, now);
@ -127,18 +122,26 @@ impl UserIpTracker {
active_ips.remove(username); active_ips.remove(username);
} }
} }
drop(active_ips); }
let mode = *self.limit_mode.read().await; pub async fn get_recent_counts_for_users(&self, users: &[String]) -> HashMap<String, usize> {
if matches!(mode, UserMaxUniqueIpsMode::ActiveWindow) { let window = *self.limit_window.read().await;
let now = Instant::now();
let mut recent_ips = self.recent_ips.write().await; let mut recent_ips = self.recent_ips.write().await;
if let Some(user_recent) = recent_ips.get_mut(username) {
user_recent.remove(&ip); let mut counts = HashMap::with_capacity(users.len());
if user_recent.is_empty() { for user in users {
recent_ips.remove(username); let count = if let Some(user_recent) = recent_ips.get_mut(user) {
} Self::prune_recent(user_recent, now, window);
} user_recent.len()
} else {
0
};
counts.insert(user.clone(), count);
} }
recent_ips.retain(|_, user_recent| !user_recent.is_empty());
counts
} }
pub async fn get_active_ip_count(&self, username: &str) -> usize { pub async fn get_active_ip_count(&self, username: &str) -> usize {

View File

@ -1267,11 +1267,21 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
.collect(); .collect();
let mut unique_users = BTreeSet::new(); let mut unique_users = BTreeSet::new();
unique_users.extend(config.access.users.keys().cloned());
unique_users.extend(config.access.user_max_unique_ips.keys().cloned()); unique_users.extend(config.access.user_max_unique_ips.keys().cloned());
unique_users.extend(ip_counts.keys().cloned()); unique_users.extend(ip_counts.keys().cloned());
let unique_users_vec: Vec<String> = unique_users.iter().cloned().collect();
let recent_counts = ip_tracker
.get_recent_counts_for_users(&unique_users_vec)
.await;
let _ = writeln!(out, "# HELP telemt_user_unique_ips_current Per-user current number of unique active IPs"); let _ = writeln!(out, "# HELP telemt_user_unique_ips_current Per-user current number of unique active IPs");
let _ = writeln!(out, "# TYPE telemt_user_unique_ips_current gauge"); let _ = writeln!(out, "# TYPE telemt_user_unique_ips_current gauge");
let _ = writeln!(
out,
"# HELP telemt_user_unique_ips_recent_window Per-user unique IPs seen in configured observation window"
);
let _ = writeln!(out, "# TYPE telemt_user_unique_ips_recent_window gauge");
let _ = writeln!(out, "# HELP telemt_user_unique_ips_limit Per-user configured unique IP limit (0 means unlimited)"); let _ = writeln!(out, "# HELP telemt_user_unique_ips_limit Per-user configured unique IP limit (0 means unlimited)");
let _ = writeln!(out, "# TYPE telemt_user_unique_ips_limit gauge"); let _ = writeln!(out, "# TYPE telemt_user_unique_ips_limit gauge");
let _ = writeln!(out, "# HELP telemt_user_unique_ips_utilization Per-user unique IP usage ratio (0 for unlimited)"); let _ = writeln!(out, "# HELP telemt_user_unique_ips_utilization Per-user unique IP usage ratio (0 for unlimited)");
@ -1286,6 +1296,12 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
0.0 0.0
}; };
let _ = writeln!(out, "telemt_user_unique_ips_current{{user=\"{}\"}} {}", user, current); let _ = writeln!(out, "telemt_user_unique_ips_current{{user=\"{}\"}} {}", user, current);
let _ = writeln!(
out,
"telemt_user_unique_ips_recent_window{{user=\"{}\"}} {}",
user,
recent_counts.get(&user).copied().unwrap_or(0)
);
let _ = writeln!(out, "telemt_user_unique_ips_limit{{user=\"{}\"}} {}", user, limit); let _ = writeln!(out, "telemt_user_unique_ips_limit{{user=\"{}\"}} {}", user, limit);
let _ = writeln!( let _ = writeln!(
out, out,
@ -1378,6 +1394,7 @@ mod tests {
assert!(output.contains("telemt_user_msgs_from_client{user=\"alice\"} 1")); assert!(output.contains("telemt_user_msgs_from_client{user=\"alice\"} 1"));
assert!(output.contains("telemt_user_msgs_to_client{user=\"alice\"} 2")); assert!(output.contains("telemt_user_msgs_to_client{user=\"alice\"} 2"));
assert!(output.contains("telemt_user_unique_ips_current{user=\"alice\"} 1")); assert!(output.contains("telemt_user_unique_ips_current{user=\"alice\"} 1"));
assert!(output.contains("telemt_user_unique_ips_recent_window{user=\"alice\"} 1"));
assert!(output.contains("telemt_user_unique_ips_limit{user=\"alice\"} 4")); assert!(output.contains("telemt_user_unique_ips_limit{user=\"alice\"} 4"));
assert!(output.contains("telemt_user_unique_ips_utilization{user=\"alice\"} 0.250000")); assert!(output.contains("telemt_user_unique_ips_utilization{user=\"alice\"} 0.250000"));
} }
@ -1391,7 +1408,8 @@ mod tests {
assert!(output.contains("telemt_connections_total 0")); assert!(output.contains("telemt_connections_total 0"));
assert!(output.contains("telemt_connections_bad_total 0")); assert!(output.contains("telemt_connections_bad_total 0"));
assert!(output.contains("telemt_handshake_timeouts_total 0")); assert!(output.contains("telemt_handshake_timeouts_total 0"));
assert!(!output.contains("user=")); assert!(output.contains("telemt_user_unique_ips_current{user="));
assert!(output.contains("telemt_user_unique_ips_recent_window{user="));
} }
#[tokio::test] #[tokio::test]
@ -1412,6 +1430,7 @@ mod tests {
"# TYPE telemt_me_writer_removed_unexpected_minus_restored_total gauge" "# TYPE telemt_me_writer_removed_unexpected_minus_restored_total gauge"
)); ));
assert!(output.contains("# TYPE telemt_user_unique_ips_current gauge")); assert!(output.contains("# TYPE telemt_user_unique_ips_current gauge"));
assert!(output.contains("# TYPE telemt_user_unique_ips_recent_window gauge"));
assert!(output.contains("# TYPE telemt_user_unique_ips_limit gauge")); assert!(output.contains("# TYPE telemt_user_unique_ips_limit gauge"));
assert!(output.contains("# TYPE telemt_user_unique_ips_utilization gauge")); assert!(output.contains("# TYPE telemt_user_unique_ips_utilization gauge"));
} }