From e78592ef9b2a5f56c577775275465c3bb7601f5b Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Sat, 25 Apr 2026 12:00:46 +0300 Subject: [PATCH] Avoid IP tracking when unique-IP limits are disabled and cap beobachten memory Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> Signed-off-by: Alexey <247128645+axkurcom@users.noreply.github.com> --- src/proxy/client.rs | 82 +++++++++++++++--------- src/proxy/tests/client_security_tests.rs | 25 +++++++- src/stats/beobachten.rs | 24 +++++-- 3 files changed, 91 insertions(+), 40 deletions(-) diff --git a/src/proxy/client.rs b/src/proxy/client.rs index 2d4dd42..2ab02ce 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -31,16 +31,24 @@ struct UserConnectionReservation { ip_tracker: Arc, user: String, ip: IpAddr, + tracks_ip: bool, active: bool, } impl UserConnectionReservation { - fn new(stats: Arc, ip_tracker: Arc, user: String, ip: IpAddr) -> Self { + fn new( + stats: Arc, + ip_tracker: Arc, + user: String, + ip: IpAddr, + tracks_ip: bool, + ) -> Self { Self { stats, ip_tracker, user, ip, + tracks_ip, active: true, } } @@ -49,7 +57,9 @@ impl UserConnectionReservation { if !self.active { return; } - self.ip_tracker.remove_ip(&self.user, self.ip).await; + if self.tracks_ip { + self.ip_tracker.remove_ip(&self.user, self.ip).await; + } self.active = false; self.stats.decrement_user_curr_connects(&self.user); } @@ -62,7 +72,9 @@ impl Drop for UserConnectionReservation { } self.active = false; self.stats.decrement_user_curr_connects(&self.user); - self.ip_tracker.enqueue_cleanup(self.user.clone(), self.ip); + if self.tracks_ip { + self.ip_tracker.enqueue_cleanup(self.user.clone(), self.ip); + } } } @@ -1600,19 +1612,22 @@ impl RunningClientHandler { }); } - match ip_tracker.check_and_add(user, peer_addr.ip()).await { - Ok(()) => {} - Err(reason) => { - stats.decrement_user_curr_connects(user); - warn!( - user = %user, - ip = %peer_addr.ip(), - reason = %reason, - "IP limit exceeded" - ); - return Err(ProxyError::ConnectionLimitExceeded { - user: user.to_string(), - }); + let tracks_ip = ip_tracker.get_user_limit(user).await.is_some(); + if tracks_ip { + match ip_tracker.check_and_add(user, peer_addr.ip()).await { + Ok(()) => {} + Err(reason) => { + stats.decrement_user_curr_connects(user); + warn!( + user = %user, + ip = %peer_addr.ip(), + reason = %reason, + "IP limit exceeded" + ); + return Err(ProxyError::ConnectionLimitExceeded { + user: user.to_string(), + }); + } } } @@ -1621,6 +1636,7 @@ impl RunningClientHandler { ip_tracker, user.to_string(), peer_addr.ip(), + tracks_ip, )) } @@ -1663,25 +1679,27 @@ impl RunningClientHandler { }); } - match ip_tracker.check_and_add(user, peer_addr.ip()).await { - Ok(()) => { - ip_tracker.remove_ip(user, peer_addr.ip()).await; - stats.decrement_user_curr_connects(user); - } - Err(reason) => { - stats.decrement_user_curr_connects(user); - warn!( - user = %user, - ip = %peer_addr.ip(), - reason = %reason, - "IP limit exceeded" - ); - return Err(ProxyError::ConnectionLimitExceeded { - user: user.to_string(), - }); + if ip_tracker.get_user_limit(user).await.is_some() { + match ip_tracker.check_and_add(user, peer_addr.ip()).await { + Ok(()) => { + ip_tracker.remove_ip(user, peer_addr.ip()).await; + } + Err(reason) => { + stats.decrement_user_curr_connects(user); + warn!( + user = %user, + ip = %peer_addr.ip(), + reason = %reason, + "IP limit exceeded" + ); + return Err(ProxyError::ConnectionLimitExceeded { + user: user.to_string(), + }); + } } } + stats.decrement_user_curr_connects(user); Ok(()) } } diff --git a/src/proxy/tests/client_security_tests.rs b/src/proxy/tests/client_security_tests.rs index 480b33d..4505e17 100644 --- a/src/proxy/tests/client_security_tests.rs +++ b/src/proxy/tests/client_security_tests.rs @@ -281,8 +281,13 @@ async fn user_connection_reservation_drop_enqueues_cleanup_synchronously() { assert_eq!(ip_tracker.get_active_ip_count(&user).await, 1); assert_eq!(stats.get_user_curr_connects(&user), 1); - let reservation = - UserConnectionReservation::new(stats.clone(), ip_tracker.clone(), user.clone(), ip); + let reservation = UserConnectionReservation::new( + stats.clone(), + ip_tracker.clone(), + user.clone(), + ip, + true, + ); // Drop the reservation synchronously without any tokio::spawn/await yielding! drop(reservation); @@ -320,6 +325,7 @@ async fn relay_task_abort_releases_user_gate_and_ip_reservation() { let stats = Arc::new(Stats::new()); let ip_tracker = Arc::new(UserIpTracker::new()); + ip_tracker.set_user_limit(user, 8).await; let mut cfg = ProxyConfig::default(); cfg.access.user_max_tcp_conns.insert(user.to_string(), 8); @@ -437,6 +443,7 @@ async fn relay_cutover_releases_user_gate_and_ip_reservation() { let stats = Arc::new(Stats::new()); let ip_tracker = Arc::new(UserIpTracker::new()); + ip_tracker.set_user_limit(user, 8).await; let mut cfg = ProxyConfig::default(); cfg.access.user_max_tcp_conns.insert(user.to_string(), 8); @@ -2879,6 +2886,7 @@ async fn explicit_reservation_release_cleans_user_and_ip_immediately() { let stats = Arc::new(Stats::new()); let ip_tracker = Arc::new(UserIpTracker::new()); + ip_tracker.set_user_limit(user, 4).await; let reservation = RunningClientHandler::acquire_user_connection_reservation_static( user, @@ -2917,6 +2925,7 @@ async fn explicit_reservation_release_does_not_double_decrement_on_drop() { let stats = Arc::new(Stats::new()); let ip_tracker = Arc::new(UserIpTracker::new()); + ip_tracker.set_user_limit(user, 4).await; let reservation = RunningClientHandler::acquire_user_connection_reservation_static( user, @@ -2947,6 +2956,7 @@ async fn drop_fallback_eventually_cleans_user_and_ip_reservation() { let stats = Arc::new(Stats::new()); let ip_tracker = Arc::new(UserIpTracker::new()); + ip_tracker.set_user_limit(user, 1).await; let reservation = RunningClientHandler::acquire_user_connection_reservation_static( user, @@ -3029,6 +3039,7 @@ async fn release_abort_storm_does_not_leak_user_or_ip_reservations() { let stats = Arc::new(Stats::new()); let ip_tracker = Arc::new(UserIpTracker::new()); + ip_tracker.set_user_limit(user, ATTEMPTS + 16).await; for idx in 0..ATTEMPTS { let peer = SocketAddr::new( @@ -3079,6 +3090,7 @@ async fn release_abort_loop_preserves_immediate_same_ip_reacquire() { let stats = Arc::new(Stats::new()); let ip_tracker = Arc::new(UserIpTracker::new()); + ip_tracker.set_user_limit(user, 1).await; for _ in 0..ITERATIONS { let reservation = RunningClientHandler::acquire_user_connection_reservation_static( @@ -3137,6 +3149,7 @@ async fn adversarial_mixed_release_drop_abort_wave_converges_to_zero() { let stats = Arc::new(Stats::new()); let ip_tracker = Arc::new(UserIpTracker::new()); + ip_tracker.set_user_limit(user, RESERVATIONS + 8).await; let mut reservations = Vec::with_capacity(RESERVATIONS); for idx in 0..RESERVATIONS { @@ -3217,6 +3230,8 @@ async fn parallel_users_abort_release_isolation_preserves_independent_cleanup() let stats = Arc::new(Stats::new()); let ip_tracker = Arc::new(UserIpTracker::new()); + ip_tracker.set_user_limit(user_a, 64).await; + ip_tracker.set_user_limit(user_b, 64).await; let mut tasks = tokio::task::JoinSet::new(); for idx in 0..64usize { @@ -3278,6 +3293,7 @@ async fn concurrent_release_storm_leaves_zero_user_and_ip_footprint() { let stats = Arc::new(Stats::new()); let ip_tracker = Arc::new(UserIpTracker::new()); + ip_tracker.set_user_limit(user, RESERVATIONS + 8).await; let mut reservations = Vec::with_capacity(RESERVATIONS); for idx in 0..RESERVATIONS { @@ -3332,6 +3348,7 @@ async fn relay_connect_error_releases_user_and_ip_before_return() { let stats = Arc::new(Stats::new()); let ip_tracker = Arc::new(UserIpTracker::new()); + ip_tracker.set_user_limit(user, 8).await; let mut config = ProxyConfig::default(); config.access.user_max_tcp_conns.insert(user.to_string(), 1); @@ -3427,6 +3444,7 @@ async fn mixed_release_and_drop_same_ip_preserves_counter_correctness() { let stats = Arc::new(Stats::new()); let ip_tracker = Arc::new(UserIpTracker::new()); + ip_tracker.set_user_limit(user, 1).await; let reservation_a = RunningClientHandler::acquire_user_connection_reservation_static( user, @@ -3487,6 +3505,7 @@ async fn drop_one_of_two_same_ip_reservations_keeps_ip_active() { let stats = Arc::new(Stats::new()); let ip_tracker = Arc::new(UserIpTracker::new()); + ip_tracker.set_user_limit(user, 1).await; let reservation_a = RunningClientHandler::acquire_user_connection_reservation_static( user, @@ -3696,6 +3715,7 @@ async fn cross_thread_drop_uses_captured_runtime_for_ip_cleanup() { let stats = Arc::new(Stats::new()); let ip_tracker = Arc::new(UserIpTracker::new()); + ip_tracker.set_user_limit(user, 8).await; let reservation = RunningClientHandler::acquire_user_connection_reservation_static( user, @@ -3740,6 +3760,7 @@ async fn immediate_reacquire_after_cross_thread_drop_succeeds() { let stats = Arc::new(Stats::new()); let ip_tracker = Arc::new(UserIpTracker::new()); + ip_tracker.set_user_limit(user, 1).await; let reservation = RunningClientHandler::acquire_user_connection_reservation_static( user, diff --git a/src/stats/beobachten.rs b/src/stats/beobachten.rs index 3d3a2da..79b2bcd 100644 --- a/src/stats/beobachten.rs +++ b/src/stats/beobachten.rs @@ -7,6 +7,7 @@ use std::time::{Duration, Instant}; use parking_lot::Mutex; const CLEANUP_INTERVAL: Duration = Duration::from_secs(30); +const MAX_BEOBACHTEN_ENTRIES: usize = 65_536; #[derive(Default)] struct BeobachtenInner { @@ -48,12 +49,23 @@ impl BeobachtenStore { Self::cleanup_if_needed(&mut guard, now, ttl); let key = (class.to_string(), ip); - let entry = guard.entries.entry(key).or_insert(BeobachtenEntry { - tries: 0, - last_seen: now, - }); - entry.tries = entry.tries.saturating_add(1); - entry.last_seen = now; + if let Some(entry) = guard.entries.get_mut(&key) { + entry.tries = entry.tries.saturating_add(1); + entry.last_seen = now; + return; + } + + if guard.entries.len() >= MAX_BEOBACHTEN_ENTRIES { + return; + } + + guard.entries.insert( + key, + BeobachtenEntry { + tries: 1, + last_seen: now, + }, + ); } pub fn snapshot_text(&self, ttl: Duration) -> String {