From 4028579068bf6a0855f73b5e3bbd149b06fbff49 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Sun, 15 Mar 2026 12:43:31 +0300 Subject: [PATCH] Inherited per-user unique IP limit --- src/api/runtime_zero.rs | 2 ++ src/api/users.rs | 11 +++++- src/config/defaults.rs | 4 +++ src/config/hot_reload.rs | 9 +++-- src/config/types.rs | 6 ++++ src/ip_tracker.rs | 65 ++++++++++++++++++++++++++++++++---- src/maestro/mod.rs | 15 ++++++--- src/maestro/runtime_tasks.rs | 16 +++++++-- src/metrics.rs | 33 ++++++++++++++++-- 9 files changed, 143 insertions(+), 18 deletions(-) diff --git a/src/api/runtime_zero.rs b/src/api/runtime_zero.rs index 93e3931..ba89302 100644 --- a/src/api/runtime_zero.rs +++ b/src/api/runtime_zero.rs @@ -90,6 +90,7 @@ pub(super) struct EffectiveMiddleProxyLimits { #[derive(Serialize)] pub(super) struct EffectiveUserIpPolicyLimits { + pub(super) global_each: usize, pub(super) mode: &'static str, pub(super) window_secs: u64, } @@ -262,6 +263,7 @@ pub(super) fn build_limits_effective_data(cfg: &ProxyConfig) -> EffectiveLimitsD me2dc_fallback: cfg.general.me2dc_fallback, }, user_ip_policy: EffectiveUserIpPolicyLimits { + global_each: cfg.access.user_max_unique_ips_global_each, mode: user_max_unique_ips_mode_label(cfg.access.user_max_unique_ips_mode), window_secs: cfg.access.user_max_unique_ips_window_secs, }, diff --git a/src/api/users.rs b/src/api/users.rs index da360c7..f339806 100644 --- a/src/api/users.rs +++ b/src/api/users.rs @@ -386,7 +386,16 @@ pub(super) async fn users_from_config( .get(&username) .map(chrono::DateTime::::to_rfc3339), data_quota_bytes: cfg.access.user_data_quota.get(&username).copied(), - max_unique_ips: cfg.access.user_max_unique_ips.get(&username).copied(), + max_unique_ips: cfg + .access + .user_max_unique_ips + .get(&username) + .copied() + .filter(|limit| *limit > 0) + .or( + (cfg.access.user_max_unique_ips_global_each > 0) + .then_some(cfg.access.user_max_unique_ips_global_each), + ), current_connections: stats.get_user_curr_connects(&username), active_unique_ips: active_ip_list.len(), active_unique_ips_list: active_ip_list, diff --git a/src/config/defaults.rs b/src/config/defaults.rs index dd15a87..82ec0b3 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -639,6 +639,10 @@ pub(crate) fn default_user_max_unique_ips_window_secs() -> u64 { DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS } +pub(crate) fn default_user_max_unique_ips_global_each() -> usize { + 0 +} + // Custom deserializer helpers #[derive(Deserialize)] diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index d14e5e2..6f07a4b 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -119,6 +119,7 @@ pub struct HotFields { pub user_expirations: std::collections::HashMap>, pub user_data_quota: std::collections::HashMap, pub user_max_unique_ips: std::collections::HashMap, + pub user_max_unique_ips_global_each: usize, pub user_max_unique_ips_mode: crate::config::UserMaxUniqueIpsMode, pub user_max_unique_ips_window_secs: u64, } @@ -234,6 +235,7 @@ impl HotFields { user_expirations: cfg.access.user_expirations.clone(), user_data_quota: cfg.access.user_data_quota.clone(), user_max_unique_ips: cfg.access.user_max_unique_ips.clone(), + user_max_unique_ips_global_each: cfg.access.user_max_unique_ips_global_each, user_max_unique_ips_mode: cfg.access.user_max_unique_ips_mode, user_max_unique_ips_window_secs: cfg.access.user_max_unique_ips_window_secs, } @@ -535,6 +537,7 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig { cfg.access.user_expirations = new.access.user_expirations.clone(); cfg.access.user_data_quota = new.access.user_data_quota.clone(); cfg.access.user_max_unique_ips = new.access.user_max_unique_ips.clone(); + cfg.access.user_max_unique_ips_global_each = new.access.user_max_unique_ips_global_each; cfg.access.user_max_unique_ips_mode = new.access.user_max_unique_ips_mode; cfg.access.user_max_unique_ips_window_secs = new.access.user_max_unique_ips_window_secs; @@ -1109,12 +1112,14 @@ fn log_changes( new_hot.user_max_unique_ips.len() ); } - if old_hot.user_max_unique_ips_mode != new_hot.user_max_unique_ips_mode + if old_hot.user_max_unique_ips_global_each != new_hot.user_max_unique_ips_global_each + || old_hot.user_max_unique_ips_mode != new_hot.user_max_unique_ips_mode || old_hot.user_max_unique_ips_window_secs != new_hot.user_max_unique_ips_window_secs { info!( - "config reload: user_max_unique_ips policy mode={:?} window={}s", + "config reload: user_max_unique_ips policy global_each={} mode={:?} window={}s", + new_hot.user_max_unique_ips_global_each, new_hot.user_max_unique_ips_mode, new_hot.user_max_unique_ips_window_secs ); diff --git a/src/config/types.rs b/src/config/types.rs index 29f23e0..e523592 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -1323,6 +1323,11 @@ pub struct AccessConfig { #[serde(default)] pub user_max_unique_ips: HashMap, + /// Global per-user unique IP limit applied when a user has no individual override. + /// `0` disables the inherited limit. + #[serde(default = "default_user_max_unique_ips_global_each")] + pub user_max_unique_ips_global_each: usize, + #[serde(default)] pub user_max_unique_ips_mode: UserMaxUniqueIpsMode, @@ -1348,6 +1353,7 @@ impl Default for AccessConfig { user_expirations: HashMap::new(), user_data_quota: HashMap::new(), user_max_unique_ips: HashMap::new(), + user_max_unique_ips_global_each: default_user_max_unique_ips_global_each(), user_max_unique_ips_mode: UserMaxUniqueIpsMode::default(), user_max_unique_ips_window_secs: default_user_max_unique_ips_window_secs(), replay_check_len: default_replay_check_len(), diff --git a/src/ip_tracker.rs b/src/ip_tracker.rs index d406d51..fce20b6 100644 --- a/src/ip_tracker.rs +++ b/src/ip_tracker.rs @@ -17,6 +17,7 @@ pub struct UserIpTracker { active_ips: Arc>>>, recent_ips: Arc>>>, max_ips: Arc>>, + default_max_ips: Arc>, limit_mode: Arc>, limit_window: Arc>, last_compact_epoch_secs: Arc, @@ -28,6 +29,7 @@ impl UserIpTracker { active_ips: Arc::new(RwLock::new(HashMap::new())), recent_ips: Arc::new(RwLock::new(HashMap::new())), max_ips: Arc::new(RwLock::new(HashMap::new())), + default_max_ips: Arc::new(RwLock::new(0)), limit_mode: Arc::new(RwLock::new(UserMaxUniqueIpsMode::ActiveWindow)), limit_window: Arc::new(RwLock::new(Duration::from_secs(30))), last_compact_epoch_secs: Arc::new(AtomicU64::new(0)), @@ -100,7 +102,10 @@ impl UserIpTracker { limits.remove(username); } - pub async fn load_limits(&self, limits: &HashMap) { + pub async fn load_limits(&self, default_limit: usize, limits: &HashMap) { + let mut default_max_ips = self.default_max_ips.write().await; + *default_max_ips = default_limit; + drop(default_max_ips); let mut max_ips = self.max_ips.write().await; max_ips.clone_from(limits); } @@ -114,9 +119,14 @@ impl UserIpTracker { pub async fn check_and_add(&self, username: &str, ip: IpAddr) -> Result<(), String> { self.maybe_compact_empty_users().await; + let default_max_ips = *self.default_max_ips.read().await; let limit = { let max_ips = self.max_ips.read().await; - max_ips.get(username).copied() + max_ips + .get(username) + .copied() + .filter(|limit| *limit > 0) + .or((default_max_ips > 0).then_some(default_max_ips)) }; let mode = *self.limit_mode.read().await; let window = *self.limit_window.read().await; @@ -255,10 +265,16 @@ impl UserIpTracker { pub async fn get_stats(&self) -> Vec<(String, usize, usize)> { let active_ips = self.active_ips.read().await; let max_ips = self.max_ips.read().await; + let default_max_ips = *self.default_max_ips.read().await; let mut stats = Vec::new(); for (username, user_ips) in active_ips.iter() { - let limit = max_ips.get(username).copied().unwrap_or(0); + let limit = max_ips + .get(username) + .copied() + .filter(|limit| *limit > 0) + .or((default_max_ips > 0).then_some(default_max_ips)) + .unwrap_or(0); stats.push((username.clone(), user_ips.len(), limit)); } @@ -293,8 +309,13 @@ impl UserIpTracker { } pub async fn get_user_limit(&self, username: &str) -> Option { + let default_max_ips = *self.default_max_ips.read().await; let max_ips = self.max_ips.read().await; - max_ips.get(username).copied() + max_ips + .get(username) + .copied() + .filter(|limit| *limit > 0) + .or((default_max_ips > 0).then_some(default_max_ips)) } pub async fn format_stats(&self) -> String { @@ -546,7 +567,7 @@ mod tests { config_limits.insert("user1".to_string(), 5); config_limits.insert("user2".to_string(), 3); - tracker.load_limits(&config_limits).await; + tracker.load_limits(0, &config_limits).await; assert_eq!(tracker.get_user_limit("user1").await, Some(5)); assert_eq!(tracker.get_user_limit("user2").await, Some(3)); @@ -560,16 +581,46 @@ mod tests { let mut first = HashMap::new(); first.insert("user1".to_string(), 2); first.insert("user2".to_string(), 3); - tracker.load_limits(&first).await; + tracker.load_limits(0, &first).await; let mut second = HashMap::new(); second.insert("user2".to_string(), 5); - tracker.load_limits(&second).await; + tracker.load_limits(0, &second).await; assert_eq!(tracker.get_user_limit("user1").await, None); assert_eq!(tracker.get_user_limit("user2").await, Some(5)); } + #[tokio::test] + async fn test_global_each_limit_applies_without_user_override() { + let tracker = UserIpTracker::new(); + tracker.load_limits(2, &HashMap::new()).await; + + let ip1 = test_ipv4(172, 16, 0, 1); + let ip2 = test_ipv4(172, 16, 0, 2); + let ip3 = test_ipv4(172, 16, 0, 3); + + assert!(tracker.check_and_add("test_user", ip1).await.is_ok()); + assert!(tracker.check_and_add("test_user", ip2).await.is_ok()); + assert!(tracker.check_and_add("test_user", ip3).await.is_err()); + assert_eq!(tracker.get_user_limit("test_user").await, Some(2)); + } + + #[tokio::test] + async fn test_user_override_wins_over_global_each_limit() { + let tracker = UserIpTracker::new(); + let mut limits = HashMap::new(); + limits.insert("test_user".to_string(), 1); + tracker.load_limits(3, &limits).await; + + let ip1 = test_ipv4(172, 17, 0, 1); + let ip2 = test_ipv4(172, 17, 0, 2); + + assert!(tracker.check_and_add("test_user", ip1).await.is_ok()); + assert!(tracker.check_and_add("test_user", ip2).await.is_err()); + assert_eq!(tracker.get_user_limit("test_user").await, Some(1)); + } + #[tokio::test] async fn test_time_window_mode_blocks_recent_ip_churn() { let tracker = UserIpTracker::new(); diff --git a/src/maestro/mod.rs b/src/maestro/mod.rs index 5f6c70a..3db7f02 100644 --- a/src/maestro/mod.rs +++ b/src/maestro/mod.rs @@ -168,17 +168,24 @@ pub async fn run() -> std::result::Result<(), Box> { stats.clone(), )); let ip_tracker = Arc::new(UserIpTracker::new()); - ip_tracker.load_limits(&config.access.user_max_unique_ips).await; + ip_tracker + .load_limits( + config.access.user_max_unique_ips_global_each, + &config.access.user_max_unique_ips, + ) + .await; ip_tracker .set_limit_policy( config.access.user_max_unique_ips_mode, config.access.user_max_unique_ips_window_secs, ) .await; - if !config.access.user_max_unique_ips.is_empty() { + if config.access.user_max_unique_ips_global_each > 0 || !config.access.user_max_unique_ips.is_empty() + { info!( - "IP limits configured for {} users", - config.access.user_max_unique_ips.len() + global_each_limit = config.access.user_max_unique_ips_global_each, + explicit_user_limits = config.access.user_max_unique_ips.len(), + "User unique IP limits configured" ); } if !config.network.dns_overrides.is_empty() { diff --git a/src/maestro/runtime_tasks.rs b/src/maestro/runtime_tasks.rs index c8aa534..329e267 100644 --- a/src/maestro/runtime_tasks.rs +++ b/src/maestro/runtime_tasks.rs @@ -131,6 +131,10 @@ pub(crate) async fn spawn_runtime_tasks( let mut config_rx_ip_limits = config_rx.clone(); tokio::spawn(async move { let mut prev_limits = config_rx_ip_limits.borrow().access.user_max_unique_ips.clone(); + let mut prev_global_each = config_rx_ip_limits + .borrow() + .access + .user_max_unique_ips_global_each; let mut prev_mode = config_rx_ip_limits.borrow().access.user_max_unique_ips_mode; let mut prev_window = config_rx_ip_limits .borrow() @@ -143,9 +147,17 @@ pub(crate) async fn spawn_runtime_tasks( } let cfg = config_rx_ip_limits.borrow_and_update().clone(); - if prev_limits != cfg.access.user_max_unique_ips { - ip_tracker_policy.load_limits(&cfg.access.user_max_unique_ips).await; + if prev_limits != cfg.access.user_max_unique_ips + || prev_global_each != cfg.access.user_max_unique_ips_global_each + { + ip_tracker_policy + .load_limits( + cfg.access.user_max_unique_ips_global_each, + &cfg.access.user_max_unique_ips, + ) + .await; prev_limits = cfg.access.user_max_unique_ips.clone(); + prev_global_each = cfg.access.user_max_unique_ips_global_each; } if prev_mode != cfg.access.user_max_unique_ips_mode diff --git a/src/metrics.rs b/src/metrics.rs index c24dc54..02edfd7 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1774,14 +1774,24 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp "# HELP telemt_user_unique_ips_recent_window Per-user unique IPs seen in configured observation window" ); let _ = writeln!(out, "# TYPE telemt_user_unique_ips_recent_window gauge"); - let _ = writeln!(out, "# HELP telemt_user_unique_ips_limit Per-user configured unique IP limit (0 means unlimited)"); + let _ = writeln!(out, "# HELP telemt_user_unique_ips_limit Effective per-user unique IP limit (0 means unlimited)"); let _ = writeln!(out, "# TYPE telemt_user_unique_ips_limit gauge"); let _ = writeln!(out, "# HELP telemt_user_unique_ips_utilization Per-user unique IP usage ratio (0 for unlimited)"); let _ = writeln!(out, "# TYPE telemt_user_unique_ips_utilization gauge"); for user in unique_users { let current = ip_counts.get(&user).copied().unwrap_or(0); - let limit = config.access.user_max_unique_ips.get(&user).copied().unwrap_or(0); + let limit = config + .access + .user_max_unique_ips + .get(&user) + .copied() + .filter(|limit| *limit > 0) + .or( + (config.access.user_max_unique_ips_global_each > 0) + .then_some(config.access.user_max_unique_ips_global_each), + ) + .unwrap_or(0); let utilization = if limit > 0 { current as f64 / limit as f64 } else { @@ -1904,6 +1914,25 @@ mod tests { assert!(output.contains("telemt_user_unique_ips_recent_window{user=")); } + #[tokio::test] + async fn test_render_uses_global_each_unique_ip_limit() { + let stats = Stats::new(); + stats.increment_user_connects("alice"); + stats.increment_user_curr_connects("alice"); + let tracker = UserIpTracker::new(); + tracker + .check_and_add("alice", "203.0.113.10".parse().unwrap()) + .await + .unwrap(); + let mut config = ProxyConfig::default(); + config.access.user_max_unique_ips_global_each = 2; + + let output = render_metrics(&stats, &config, &tracker).await; + + assert!(output.contains("telemt_user_unique_ips_limit{user=\"alice\"} 2")); + assert!(output.contains("telemt_user_unique_ips_utilization{user=\"alice\"} 0.500000")); + } + #[tokio::test] async fn test_render_has_type_annotations() { let stats = Stats::new();