From d81140ccec601d0d65e2bfa9755d06cd099e20a4 Mon Sep 17 00:00:00 2001 From: David Osipov Date: Tue, 17 Mar 2026 19:39:29 +0400 Subject: [PATCH] Enhance UserConnectionReservation management: add active state and release method, improve cleanup on drop, and implement tests for immediate release and concurrent handling --- src/proxy/client.rs | 24 +++- src/proxy/client_security_tests.rs | 215 +++++++++++++++++++++++++++++ 2 files changed, 238 insertions(+), 1 deletion(-) diff --git a/src/proxy/client.rs b/src/proxy/client.rs index 254d922..f80f74d 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -29,6 +29,7 @@ struct UserConnectionReservation { ip_tracker: Arc, user: String, ip: IpAddr, + active: bool, } impl UserConnectionReservation { @@ -38,12 +39,26 @@ impl UserConnectionReservation { ip_tracker, user, ip, + active: true, } } + + async fn release(mut self) { + if !self.active { + return; + } + self.active = false; + self.stats.decrement_user_curr_connects(&self.user); + self.ip_tracker.remove_ip(&self.user, self.ip).await; + } } impl Drop for UserConnectionReservation { fn drop(&mut self) { + if !self.active { + return; + } + self.active = false; self.stats.decrement_user_curr_connects(&self.user); if let Ok(handle) = tokio::runtime::Handle::try_current() { @@ -53,6 +68,12 @@ impl Drop for UserConnectionReservation { handle.spawn(async move { ip_tracker.remove_ip(&user, ip).await; }); + } else { + warn!( + user = %self.user, + ip = %self.ip, + "UserConnectionReservation dropped without Tokio runtime; IP reservation cleanup skipped" + ); } } } @@ -833,7 +854,7 @@ impl RunningClientHandler { { let user = success.user.clone(); - let _user_limit_reservation = + let user_limit_reservation = match Self::acquire_user_connection_reservation_static( &user, &config, @@ -905,6 +926,7 @@ impl RunningClientHandler { ) .await }; + user_limit_reservation.release().await; relay_result } diff --git a/src/proxy/client_security_tests.rs b/src/proxy/client_security_tests.rs index defb3c0..7047987 100644 --- a/src/proxy/client_security_tests.rs +++ b/src/proxy/client_security_tests.rs @@ -1420,6 +1420,221 @@ async fn tcp_limit_rejection_does_not_reserve_ip_or_trigger_rollback() { ); } +#[tokio::test] +async fn explicit_reservation_release_cleans_user_and_ip_immediately() { + let user = "release-user"; + let peer_addr: SocketAddr = "198.51.100.240:50002".parse().unwrap(); + + let mut config = ProxyConfig::default(); + config + .access + .user_max_tcp_conns + .insert(user.to_string(), 4); + + let stats = Arc::new(Stats::new()); + let ip_tracker = Arc::new(UserIpTracker::new()); + + let reservation = RunningClientHandler::acquire_user_connection_reservation_static( + user, + &config, + stats.clone(), + peer_addr, + ip_tracker.clone(), + ) + .await + .expect("reservation acquisition must succeed"); + + assert_eq!(stats.get_user_curr_connects(user), 1); + assert_eq!(ip_tracker.get_active_ip_count(user).await, 1); + + reservation.release().await; + + assert_eq!( + stats.get_user_curr_connects(user), + 0, + "explicit release must synchronously free user connection slot" + ); + assert_eq!( + ip_tracker.get_active_ip_count(user).await, + 0, + "explicit release must synchronously remove reserved user IP" + ); +} + +#[tokio::test] +async fn explicit_reservation_release_does_not_double_decrement_on_drop() { + let user = "release-once-user"; + let peer_addr: SocketAddr = "198.51.100.241:50003".parse().unwrap(); + + let mut config = ProxyConfig::default(); + config + .access + .user_max_tcp_conns + .insert(user.to_string(), 4); + + let stats = Arc::new(Stats::new()); + let ip_tracker = Arc::new(UserIpTracker::new()); + + let reservation = RunningClientHandler::acquire_user_connection_reservation_static( + user, + &config, + stats.clone(), + peer_addr, + ip_tracker, + ) + .await + .expect("reservation acquisition must succeed"); + + reservation.release().await; + + assert_eq!( + stats.get_user_curr_connects(user), + 0, + "release must disarm drop and prevent double decrement" + ); +} + +#[tokio::test] +async fn drop_fallback_eventually_cleans_user_and_ip_reservation() { + let user = "drop-fallback-user"; + let peer_addr: SocketAddr = "198.51.100.242:50004".parse().unwrap(); + + let mut config = ProxyConfig::default(); + config + .access + .user_max_tcp_conns + .insert(user.to_string(), 4); + + let stats = Arc::new(Stats::new()); + let ip_tracker = Arc::new(UserIpTracker::new()); + + let reservation = RunningClientHandler::acquire_user_connection_reservation_static( + user, + &config, + stats.clone(), + peer_addr, + ip_tracker.clone(), + ) + .await + .expect("reservation acquisition must succeed"); + + assert_eq!(stats.get_user_curr_connects(user), 1); + assert_eq!(ip_tracker.get_active_ip_count(user).await, 1); + + drop(reservation); + + tokio::time::timeout(Duration::from_secs(1), async { + loop { + if stats.get_user_curr_connects(user) == 0 + && ip_tracker.get_active_ip_count(user).await == 0 + { + break; + } + tokio::time::sleep(Duration::from_millis(5)).await; + } + }) + .await + .expect("drop fallback must eventually clean both user slot and active IP"); +} + +#[tokio::test] +async fn explicit_release_allows_immediate_cross_ip_reacquire_under_limit() { + let user = "cross-ip-user"; + let peer1: SocketAddr = "198.51.100.243:50005".parse().unwrap(); + let peer2: SocketAddr = "198.51.100.244:50006".parse().unwrap(); + + let mut config = ProxyConfig::default(); + config + .access + .user_max_tcp_conns + .insert(user.to_string(), 4); + + let stats = Arc::new(Stats::new()); + let ip_tracker = Arc::new(UserIpTracker::new()); + ip_tracker.set_user_limit(user, 1).await; + + let first = RunningClientHandler::acquire_user_connection_reservation_static( + user, + &config, + stats.clone(), + peer1, + ip_tracker.clone(), + ) + .await + .expect("first reservation must succeed"); + first.release().await; + + let second = RunningClientHandler::acquire_user_connection_reservation_static( + user, + &config, + stats.clone(), + peer2, + ip_tracker.clone(), + ) + .await + .expect("second reservation must succeed immediately after explicit release"); + second.release().await; + + assert_eq!(stats.get_user_curr_connects(user), 0); + assert_eq!(ip_tracker.get_active_ip_count(user).await, 0); +} + +#[tokio::test] +async fn concurrent_release_storm_leaves_zero_user_and_ip_footprint() { + const RESERVATIONS: usize = 64; + + let user = "release-storm-user"; + let mut config = ProxyConfig::default(); + config + .access + .user_max_tcp_conns + .insert(user.to_string(), RESERVATIONS + 8); + + let stats = Arc::new(Stats::new()); + let ip_tracker = Arc::new(UserIpTracker::new()); + + let mut reservations = Vec::with_capacity(RESERVATIONS); + for idx in 0..RESERVATIONS { + let ip = std::net::Ipv4Addr::new(203, 0, 113, (idx + 1) as u8); + let peer = SocketAddr::new(IpAddr::V4(ip), 51000 + idx as u16); + let reservation = RunningClientHandler::acquire_user_connection_reservation_static( + user, + &config, + stats.clone(), + peer, + ip_tracker.clone(), + ) + .await + .expect("reservation acquisition in storm must succeed"); + reservations.push(reservation); + } + + assert_eq!(stats.get_user_curr_connects(user), RESERVATIONS as u64); + assert_eq!(ip_tracker.get_active_ip_count(user).await, RESERVATIONS); + + let mut tasks = tokio::task::JoinSet::new(); + for reservation in reservations { + tasks.spawn(async move { + reservation.release().await; + }); + } + + while let Some(result) = tasks.join_next().await { + result.expect("release task must not panic"); + } + + assert_eq!( + stats.get_user_curr_connects(user), + 0, + "release storm must drain user current-connection counter to zero" + ); + assert_eq!( + ip_tracker.get_active_ip_count(user).await, + 0, + "release storm must clear all active IP entries" + ); +} + #[tokio::test] async fn quota_rejection_does_not_reserve_ip_or_trigger_rollback() { let mut config = ProxyConfig::default();