diff --git a/src/api/users.rs b/src/api/users.rs index c907070..8e90c7f 100644 --- a/src/api/users.rs +++ b/src/api/users.rs @@ -287,6 +287,7 @@ pub(super) async fn delete_user( .map_err(|e| ApiFailure::bad_request(format!("config validation failed: {}", e)))?; let revision = save_config_to_disk(&shared.config_path, &cfg).await?; drop(_guard); + shared.ip_tracker.remove_user_limit(user).await; shared.ip_tracker.clear_user_ips(user).await; Ok((user.to_string(), revision)) diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 86f569b..9ac6c53 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -12,6 +12,7 @@ const DEFAULT_ME_SINGLE_ENDPOINT_SHADOW_WRITERS: u8 = 2; const DEFAULT_ME_ADAPTIVE_FLOOR_IDLE_SECS: u64 = 90; const DEFAULT_ME_ADAPTIVE_FLOOR_MIN_WRITERS_SINGLE_ENDPOINT: u8 = 1; const DEFAULT_ME_ADAPTIVE_FLOOR_RECOVER_GRACE_SECS: u64 = 180; +const DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS: u64 = 30; const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 2; const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 5; const DEFAULT_LISTEN_ADDR_IPV6: &str = "::"; @@ -464,6 +465,10 @@ pub(crate) fn default_access_users() -> HashMap { )]) } +pub(crate) fn default_user_max_unique_ips_window_secs() -> u64 { + DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS +} + // Custom deserializer helpers #[derive(Deserialize)] diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index d752d45..e2f4246 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -438,6 +438,16 @@ fn log_changes( new_hot.access.user_max_unique_ips.len() ); } + if old_hot.access.user_max_unique_ips_mode != new_hot.access.user_max_unique_ips_mode + || old_hot.access.user_max_unique_ips_window_secs + != new_hot.access.user_max_unique_ips_window_secs + { + info!( + "config reload: user_max_unique_ips policy mode={:?} window={}s", + new_hot.access.user_max_unique_ips_mode, + new_hot.access.user_max_unique_ips_window_secs + ); + } } /// Load config, validate, diff against current, and broadcast if changed. diff --git a/src/config/load.rs b/src/config/load.rs index b469299..666c938 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -257,6 +257,12 @@ impl ProxyConfig { )); } + if config.access.user_max_unique_ips_window_secs == 0 { + return Err(ProxyError::Config( + "access.user_max_unique_ips_window_secs must be > 0".to_string(), + )); + } + if config.general.me_reinit_every_secs == 0 { return Err(ProxyError::Config( "general.me_reinit_every_secs must be > 0".to_string(), @@ -728,6 +734,14 @@ mod tests { default_api_minimal_runtime_cache_ttl_ms() ); assert_eq!(cfg.access.users, default_access_users()); + assert_eq!( + cfg.access.user_max_unique_ips_mode, + UserMaxUniqueIpsMode::default() + ); + assert_eq!( + cfg.access.user_max_unique_ips_window_secs, + default_user_max_unique_ips_window_secs() + ); } #[test] diff --git a/src/config/types.rs b/src/config/types.rs index ee17108..00260a8 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -183,6 +183,19 @@ impl MeFloorMode { } } +/// Per-user unique source IP limit mode. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] +#[serde(rename_all = "snake_case")] +pub enum UserMaxUniqueIpsMode { + /// Count only currently active source IPs. + #[default] + ActiveWindow, + /// Count source IPs seen within the recent time window. + TimeWindow, + /// Enforce both active and recent-window limits at the same time. + Combined, +} + /// Telemetry controls for hot-path counters and ME diagnostics. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct TelemetryConfig { @@ -1045,6 +1058,12 @@ pub struct AccessConfig { #[serde(default)] pub user_max_unique_ips: HashMap, + #[serde(default)] + pub user_max_unique_ips_mode: UserMaxUniqueIpsMode, + + #[serde(default = "default_user_max_unique_ips_window_secs")] + pub user_max_unique_ips_window_secs: u64, + #[serde(default = "default_replay_check_len")] pub replay_check_len: usize, @@ -1064,6 +1083,8 @@ impl Default for AccessConfig { user_expirations: HashMap::new(), user_data_quota: HashMap::new(), user_max_unique_ips: HashMap::new(), + user_max_unique_ips_mode: UserMaxUniqueIpsMode::default(), + user_max_unique_ips_window_secs: default_user_max_unique_ips_window_secs(), replay_check_len: default_replay_check_len(), replay_window_secs: default_replay_window_secs(), ignore_time_skew: false, diff --git a/src/ip_tracker.rs b/src/ip_tracker.rs index 32fcbe3..626d591 100644 --- a/src/ip_tracker.rs +++ b/src/ip_tracker.rs @@ -1,153 +1,151 @@ -// src/ip_tracker.rs -// IP address tracking and limiting for users +// IP address tracking and per-user unique IP limiting. #![allow(dead_code)] use std::collections::{HashMap, HashSet}; use std::net::IpAddr; use std::sync::Arc; +use std::time::{Duration, Instant}; + use tokio::sync::RwLock; -/// Трекер уникальных IP-адресов для каждого пользователя MTProxy -/// -/// Предоставляет thread-safe механизм для: -/// - Отслеживания активных IP-адресов каждого пользователя -/// - Ограничения количества уникальных IP на пользователя -/// - Автоматической очистки при отключении клиентов +use crate::config::UserMaxUniqueIpsMode; + #[derive(Debug, Clone)] pub struct UserIpTracker { - /// Маппинг: Имя пользователя -> Множество активных IP-адресов active_ips: Arc>>>, - - /// Маппинг: Имя пользователя -> Максимально разрешенное количество уникальных IP + recent_ips: Arc>>>, max_ips: Arc>>, + limit_mode: Arc>, + limit_window: Arc>, } impl UserIpTracker { - /// Создать новый пустой трекер pub fn new() -> Self { Self { active_ips: Arc::new(RwLock::new(HashMap::new())), + recent_ips: Arc::new(RwLock::new(HashMap::new())), max_ips: Arc::new(RwLock::new(HashMap::new())), + limit_mode: Arc::new(RwLock::new(UserMaxUniqueIpsMode::ActiveWindow)), + limit_window: Arc::new(RwLock::new(Duration::from_secs(30))), } } - /// Установить лимит уникальных IP для конкретного пользователя - /// - /// # Arguments - /// * `username` - Имя пользователя - /// * `max_ips` - Максимальное количество одновременно активных IP-адресов + pub async fn set_limit_policy(&self, mode: UserMaxUniqueIpsMode, window_secs: u64) { + { + let mut current_mode = self.limit_mode.write().await; + *current_mode = mode; + } + let mut current_window = self.limit_window.write().await; + *current_window = Duration::from_secs(window_secs.max(1)); + } + pub async fn set_user_limit(&self, username: &str, max_ips: usize) { let mut limits = self.max_ips.write().await; limits.insert(username.to_string(), max_ips); } - /// Загрузить лимиты из конфигурации - /// - /// # Arguments - /// * `limits` - HashMap с лимитами из config.toml - pub async fn load_limits(&self, limits: &HashMap) { - let mut max_ips = self.max_ips.write().await; - for (user, limit) in limits { - max_ips.insert(user.clone(), *limit); - } + pub async fn remove_user_limit(&self, username: &str) { + let mut limits = self.max_ips.write().await; + limits.remove(username); } - /// Проверить, может ли пользователь подключиться с данного IP-адреса - /// и добавить IP в список активных, если проверка успешна - /// - /// # Arguments - /// * `username` - Имя пользователя - /// * `ip` - IP-адрес клиента - /// - /// # Returns - /// * `Ok(())` - Подключение разрешено, IP добавлен в активные - /// * `Err(String)` - Подключение отклонено с описанием причины - pub async fn check_and_add(&self, username: &str, ip: IpAddr) -> Result<(), String> { - // Получаем лимит для пользователя - let max_ips = self.max_ips.read().await; - let limit = match max_ips.get(username) { - Some(limit) => *limit, - None => { - // Если лимит не задан - разрешаем безлимитный доступ - drop(max_ips); - let mut active_ips = self.active_ips.write().await; - let user_ips = active_ips - .entry(username.to_string()) - .or_insert_with(HashSet::new); - user_ips.insert(ip); - return Ok(()); - } - }; - drop(max_ips); + pub async fn load_limits(&self, limits: &HashMap) { + let mut max_ips = self.max_ips.write().await; + max_ips.clone_from(limits); + } + + fn prune_recent(user_recent: &mut HashMap, now: Instant, window: Duration) { + if user_recent.is_empty() { + return; + } + user_recent.retain(|_, seen_at| now.duration_since(*seen_at) <= window); + } + + pub async fn check_and_add(&self, username: &str, ip: IpAddr) -> Result<(), String> { + let limit = { + let max_ips = self.max_ips.read().await; + max_ips.get(username).copied() + }; - // Проверяем и обновляем активные IP let mut active_ips = self.active_ips.write().await; - let user_ips = active_ips + let user_active = active_ips .entry(username.to_string()) .or_insert_with(HashSet::new); - // Если IP уже есть в списке - это повторное подключение, разрешаем - if user_ips.contains(&ip) { + if limit.is_none() { + user_active.insert(ip); return Ok(()); } - // Проверяем, не превышен ли лимит - if user_ips.len() >= limit { + let limit = limit.unwrap_or_default(); + let mode = *self.limit_mode.read().await; + let window = *self.limit_window.read().await; + let now = Instant::now(); + + let mut recent_ips = self.recent_ips.write().await; + let user_recent = recent_ips + .entry(username.to_string()) + .or_insert_with(HashMap::new); + Self::prune_recent(user_recent, now, window); + + if user_active.contains(&ip) { + user_recent.insert(ip, now); + return Ok(()); + } + + let active_limit_reached = user_active.len() >= limit; + let recent_limit_reached = user_recent.len() >= limit; + let deny = match mode { + UserMaxUniqueIpsMode::ActiveWindow => active_limit_reached, + UserMaxUniqueIpsMode::TimeWindow => recent_limit_reached, + UserMaxUniqueIpsMode::Combined => active_limit_reached || recent_limit_reached, + }; + + if deny { return Err(format!( - "IP limit reached for user '{}': {}/{} unique IPs already connected", + "IP limit reached for user '{}': active={}/{} recent={}/{} mode={:?}", username, - user_ips.len(), - limit + user_active.len(), + limit, + user_recent.len(), + limit, + mode )); } - // Лимит не превышен - добавляем новый IP - user_ips.insert(ip); + user_active.insert(ip); + user_recent.insert(ip, now); Ok(()) } - /// Удалить IP-адрес из списка активных при отключении клиента - /// - /// # Arguments - /// * `username` - Имя пользователя - /// * `ip` - IP-адрес отключившегося клиента pub async fn remove_ip(&self, username: &str, ip: IpAddr) { let mut active_ips = self.active_ips.write().await; - if let Some(user_ips) = active_ips.get_mut(username) { user_ips.remove(&ip); - - // Если у пользователя не осталось активных IP - удаляем запись - // для экономии памяти if user_ips.is_empty() { active_ips.remove(username); } } + drop(active_ips); + + let mode = *self.limit_mode.read().await; + if matches!(mode, UserMaxUniqueIpsMode::ActiveWindow) { + let mut recent_ips = self.recent_ips.write().await; + if let Some(user_recent) = recent_ips.get_mut(username) { + user_recent.remove(&ip); + if user_recent.is_empty() { + recent_ips.remove(username); + } + } + } } - /// Получить текущее количество активных IP-адресов для пользователя - /// - /// # Arguments - /// * `username` - Имя пользователя - /// - /// # Returns - /// Количество уникальных активных IP-адресов pub async fn get_active_ip_count(&self, username: &str) -> usize { let active_ips = self.active_ips.read().await; - active_ips - .get(username) - .map(|ips| ips.len()) - .unwrap_or(0) + active_ips.get(username).map(|ips| ips.len()).unwrap_or(0) } - /// Получить список всех активных IP-адресов для пользователя - /// - /// # Arguments - /// * `username` - Имя пользователя - /// - /// # Returns - /// Вектор с активными IP-адресами pub async fn get_active_ips(&self, username: &str) -> Vec { let active_ips = self.active_ips.read().await; active_ips @@ -156,49 +154,38 @@ impl UserIpTracker { .unwrap_or_else(Vec::new) } - /// Получить статистику по всем пользователям - /// - /// # Returns - /// Вектор кортежей: (имя_пользователя, количество_активных_IP, лимит) pub async fn get_stats(&self) -> Vec<(String, usize, usize)> { let active_ips = self.active_ips.read().await; let max_ips = self.max_ips.read().await; let mut stats = Vec::new(); - - // Собираем статистику по пользователям с активными подключениями for (username, user_ips) in active_ips.iter() { let limit = max_ips.get(username).copied().unwrap_or(0); stats.push((username.clone(), user_ips.len(), limit)); } - - stats.sort_by(|a, b| a.0.cmp(&b.0)); // Сортируем по имени пользователя + + stats.sort_by(|a, b| a.0.cmp(&b.0)); stats } - /// Очистить все активные IP для пользователя (при необходимости) - /// - /// # Arguments - /// * `username` - Имя пользователя pub async fn clear_user_ips(&self, username: &str) { let mut active_ips = self.active_ips.write().await; active_ips.remove(username); + drop(active_ips); + + let mut recent_ips = self.recent_ips.write().await; + recent_ips.remove(username); } - /// Очистить всю статистику (использовать с осторожностью!) pub async fn clear_all(&self) { let mut active_ips = self.active_ips.write().await; active_ips.clear(); + drop(active_ips); + + let mut recent_ips = self.recent_ips.write().await; + recent_ips.clear(); } - /// Проверить, подключен ли пользователь с данного IP - /// - /// # Arguments - /// * `username` - Имя пользователя - /// * `ip` - IP-адрес для проверки - /// - /// # Returns - /// `true` если IP активен, `false` если нет pub async fn is_ip_active(&self, username: &str, ip: IpAddr) -> bool { let active_ips = self.active_ips.read().await; active_ips @@ -207,46 +194,39 @@ impl UserIpTracker { .unwrap_or(false) } - /// Получить лимит для пользователя - /// - /// # Arguments - /// * `username` - Имя пользователя - /// - /// # Returns - /// Лимит IP-адресов или None, если лимит не установлен pub async fn get_user_limit(&self, username: &str) -> Option { let max_ips = self.max_ips.read().await; max_ips.get(username).copied() } - /// Форматировать статистику в читаемый текст - /// - /// # Returns - /// Строка со статистикой для логов или мониторинга pub async fn format_stats(&self) -> String { let stats = self.get_stats().await; - + if stats.is_empty() { return String::from("No active users"); } - + let mut output = String::from("User IP Statistics:\n"); output.push_str("==================\n"); - + for (username, active_count, limit) in stats { output.push_str(&format!( "User: {:<20} Active IPs: {}/{}\n", username, active_count, - if limit > 0 { limit.to_string() } else { "unlimited".to_string() } + if limit > 0 { + limit.to_string() + } else { + "unlimited".to_string() + } )); - + let ips = self.get_active_ips(&username).await; for ip in ips { - output.push_str(&format!(" └─ {}\n", ip)); + output.push_str(&format!(" - {}\n", ip)); } } - + output } } @@ -257,10 +237,6 @@ impl Default for UserIpTracker { } } -// ============================================================================ -// ТЕСТЫ -// ============================================================================ - #[cfg(test)] mod tests { use super::*; @@ -283,14 +259,10 @@ mod tests { let ip2 = test_ipv4(192, 168, 1, 2); let ip3 = test_ipv4(192, 168, 1, 3); - // Первые два IP должны быть приняты assert!(tracker.check_and_add("test_user", ip1).await.is_ok()); assert!(tracker.check_and_add("test_user", ip2).await.is_ok()); - - // Третий IP должен быть отклонен assert!(tracker.check_and_add("test_user", ip3).await.is_err()); - // Проверяем счетчик assert_eq!(tracker.get_active_ip_count("test_user").await, 2); } @@ -301,13 +273,8 @@ mod tests { let ip1 = test_ipv4(192, 168, 1, 1); - // Первое подключение assert!(tracker.check_and_add("test_user", ip1).await.is_ok()); - - // Повторное подключение с того же IP должно пройти assert!(tracker.check_and_add("test_user", ip1).await.is_ok()); - - // Счетчик не должен увеличиться assert_eq!(tracker.get_active_ip_count("test_user").await, 1); } @@ -320,36 +287,28 @@ mod tests { let ip2 = test_ipv4(192, 168, 1, 2); let ip3 = test_ipv4(192, 168, 1, 3); - // Добавляем два IP assert!(tracker.check_and_add("test_user", ip1).await.is_ok()); assert!(tracker.check_and_add("test_user", ip2).await.is_ok()); - - // Третий не должен пройти assert!(tracker.check_and_add("test_user", ip3).await.is_err()); - // Удаляем первый IP tracker.remove_ip("test_user", ip1).await; - - // Теперь третий должен пройти + assert!(tracker.check_and_add("test_user", ip3).await.is_ok()); - assert_eq!(tracker.get_active_ip_count("test_user").await, 2); } #[tokio::test] async fn test_no_limit() { let tracker = UserIpTracker::new(); - // Не устанавливаем лимит для test_user let ip1 = test_ipv4(192, 168, 1, 1); let ip2 = test_ipv4(192, 168, 1, 2); let ip3 = test_ipv4(192, 168, 1, 3); - // Без лимита все IP должны проходить assert!(tracker.check_and_add("test_user", ip1).await.is_ok()); assert!(tracker.check_and_add("test_user", ip2).await.is_ok()); assert!(tracker.check_and_add("test_user", ip3).await.is_ok()); - + assert_eq!(tracker.get_active_ip_count("test_user").await, 3); } @@ -362,11 +321,9 @@ mod tests { let ip1 = test_ipv4(192, 168, 1, 1); let ip2 = test_ipv4(192, 168, 1, 2); - // user1 может использовать 2 IP assert!(tracker.check_and_add("user1", ip1).await.is_ok()); assert!(tracker.check_and_add("user1", ip2).await.is_ok()); - // user2 может использовать только 1 IP assert!(tracker.check_and_add("user2", ip1).await.is_ok()); assert!(tracker.check_and_add("user2", ip2).await.is_err()); } @@ -379,10 +336,9 @@ mod tests { let ipv4 = test_ipv4(192, 168, 1, 1); let ipv6 = test_ipv6(); - // Должны работать оба типа адресов assert!(tracker.check_and_add("test_user", ipv4).await.is_ok()); assert!(tracker.check_and_add("test_user", ipv6).await.is_ok()); - + assert_eq!(tracker.get_active_ip_count("test_user").await, 2); } @@ -417,8 +373,7 @@ mod tests { let stats = tracker.get_stats().await; assert_eq!(stats.len(), 2); - - // Проверяем наличие обоих пользователей в статистике + assert!(stats.iter().any(|(name, _, _)| name == "user1")); assert!(stats.iter().any(|(name, _, _)| name == "user2")); } @@ -427,10 +382,10 @@ mod tests { async fn test_clear_user_ips() { let tracker = UserIpTracker::new(); let ip1 = test_ipv4(192, 168, 1, 1); - + tracker.check_and_add("test_user", ip1).await.unwrap(); assert_eq!(tracker.get_active_ip_count("test_user").await, 1); - + tracker.clear_user_ips("test_user").await; assert_eq!(tracker.get_active_ip_count("test_user").await, 0); } @@ -440,9 +395,9 @@ mod tests { let tracker = UserIpTracker::new(); let ip1 = test_ipv4(192, 168, 1, 1); let ip2 = test_ipv4(192, 168, 1, 2); - + tracker.check_and_add("test_user", ip1).await.unwrap(); - + assert!(tracker.is_ip_active("test_user", ip1).await); assert!(!tracker.is_ip_active("test_user", ip2).await); } @@ -450,15 +405,85 @@ mod tests { #[tokio::test] async fn test_load_limits_from_config() { let tracker = UserIpTracker::new(); - + let mut config_limits = HashMap::new(); config_limits.insert("user1".to_string(), 5); config_limits.insert("user2".to_string(), 3); - + tracker.load_limits(&config_limits).await; - + assert_eq!(tracker.get_user_limit("user1").await, Some(5)); assert_eq!(tracker.get_user_limit("user2").await, Some(3)); assert_eq!(tracker.get_user_limit("user3").await, None); } + + #[tokio::test] + async fn test_load_limits_replaces_previous_map() { + let tracker = UserIpTracker::new(); + + let mut first = HashMap::new(); + first.insert("user1".to_string(), 2); + first.insert("user2".to_string(), 3); + tracker.load_limits(&first).await; + + let mut second = HashMap::new(); + second.insert("user2".to_string(), 5); + tracker.load_limits(&second).await; + + assert_eq!(tracker.get_user_limit("user1").await, None); + assert_eq!(tracker.get_user_limit("user2").await, Some(5)); + } + + #[tokio::test] + async fn test_time_window_mode_blocks_recent_ip_churn() { + let tracker = UserIpTracker::new(); + tracker.set_user_limit("test_user", 1).await; + tracker + .set_limit_policy(UserMaxUniqueIpsMode::TimeWindow, 30) + .await; + + let ip1 = test_ipv4(10, 0, 0, 1); + let ip2 = test_ipv4(10, 0, 0, 2); + + assert!(tracker.check_and_add("test_user", ip1).await.is_ok()); + tracker.remove_ip("test_user", ip1).await; + assert!(tracker.check_and_add("test_user", ip2).await.is_err()); + } + + #[tokio::test] + async fn test_combined_mode_enforces_active_and_recent_limits() { + let tracker = UserIpTracker::new(); + tracker.set_user_limit("test_user", 1).await; + tracker + .set_limit_policy(UserMaxUniqueIpsMode::Combined, 30) + .await; + + let ip1 = test_ipv4(10, 0, 1, 1); + let ip2 = test_ipv4(10, 0, 1, 2); + + assert!(tracker.check_and_add("test_user", ip1).await.is_ok()); + assert!(tracker.check_and_add("test_user", ip2).await.is_err()); + + tracker.remove_ip("test_user", ip1).await; + assert!(tracker.check_and_add("test_user", ip2).await.is_err()); + } + + #[tokio::test] + async fn test_time_window_expires() { + let tracker = UserIpTracker::new(); + tracker.set_user_limit("test_user", 1).await; + tracker + .set_limit_policy(UserMaxUniqueIpsMode::TimeWindow, 1) + .await; + + let ip1 = test_ipv4(10, 1, 0, 1); + let ip2 = test_ipv4(10, 1, 0, 2); + + assert!(tracker.check_and_add("test_user", ip1).await.is_ok()); + tracker.remove_ip("test_user", ip1).await; + assert!(tracker.check_and_add("test_user", ip2).await.is_err()); + + tokio::time::sleep(Duration::from_millis(1100)).await; + assert!(tracker.check_and_add("test_user", ip2).await.is_ok()); + } } diff --git a/src/main.rs b/src/main.rs index 0aec195..6bff525 100644 --- a/src/main.rs +++ b/src/main.rs @@ -423,6 +423,12 @@ async fn main() -> std::result::Result<(), Box> { // IP Tracker initialization let ip_tracker = Arc::new(UserIpTracker::new()); ip_tracker.load_limits(&config.access.user_max_unique_ips).await; + ip_tracker + .set_limit_policy( + config.access.user_max_unique_ips_mode, + config.access.user_max_unique_ips_window_secs, + ) + .await; if !config.access.user_max_unique_ips.is_empty() { info!("IP limits configured for {} users", config.access.user_max_unique_ips.len()); @@ -847,6 +853,51 @@ async fn main() -> std::result::Result<(), Box> { } }); + let ip_tracker_policy = ip_tracker.clone(); + let mut config_rx_ip_limits = config_rx.clone(); + tokio::spawn(async move { + let mut prev_limits = config_rx_ip_limits + .borrow() + .access + .user_max_unique_ips + .clone(); + let mut prev_mode = config_rx_ip_limits + .borrow() + .access + .user_max_unique_ips_mode; + let mut prev_window = config_rx_ip_limits + .borrow() + .access + .user_max_unique_ips_window_secs; + + loop { + if config_rx_ip_limits.changed().await.is_err() { + break; + } + let cfg = config_rx_ip_limits.borrow_and_update().clone(); + + if prev_limits != cfg.access.user_max_unique_ips { + ip_tracker_policy + .load_limits(&cfg.access.user_max_unique_ips) + .await; + prev_limits = cfg.access.user_max_unique_ips.clone(); + } + + if prev_mode != cfg.access.user_max_unique_ips_mode + || prev_window != cfg.access.user_max_unique_ips_window_secs + { + ip_tracker_policy + .set_limit_policy( + cfg.access.user_max_unique_ips_mode, + cfg.access.user_max_unique_ips_window_secs, + ) + .await; + prev_mode = cfg.access.user_max_unique_ips_mode; + prev_window = cfg.access.user_max_unique_ips_window_secs; + } + } + }); + let beobachten_writer = beobachten.clone(); let config_rx_beobachten = config_rx.clone(); tokio::spawn(async move {